From a71790bb1820742806fdbc11513abfaec7a141f0 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 3 Jun 2014 16:40:44 +0200 Subject: [PATCH] +per #15229 defer for PersistentActor * Deferred events are not persisted, thus will not participate in replays etc. If users want events to be persisted, they can simply use `persistAsync` instead. * This, 3rd, rewrite extends the Persistent hierarchy by a top level trait "Resequenceable", which is used to mark every event to be sent in sequence back to the PersistentActor. These are split into NonPersistentRepr or PersistentRepr, and acted upon accordingly. * defer is guaranteed to be called, even after persistence failures * Includes docs updates for java/scala/java8 Resolves #15229 Depends on #15227 Conflicts: akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala project/AkkaBuild.scala --- .../main/scala/akka/persistence/Common.scala | 15 ++ .../PersistenceActorDeferBenchmark.scala | 140 ++++++++++ ...k.scala => PersistentActorBenchmark.scala} | 11 +- .../docs/persistence/PersistenceDocTest.java | 55 +++- akka-docs/rst/java/lambda-persistence.rst | 60 ++++- akka-docs/rst/java/persistence.rst | 24 ++ .../docs/persistence/PersistenceDocSpec.scala | 88 ++++++- akka-docs/rst/scala/persistence.rst | 28 +- .../scala/akka/persistence/Eventsourced.scala | 245 ++++++++++++++---- .../akka/persistence/JournalProtocol.scala | 4 +- .../scala/akka/persistence/Persistent.scala | 50 ++-- .../scala/akka/persistence/Processor.scala | 27 +- .../journal/AsyncWriteJournal.scala | 15 +- .../journal/SyncWriteJournal.scala | 16 +- .../journal/WriteJournalBase.scala | 24 ++ .../serialization/MessageSerializer.scala | 4 +- .../persistence/PersistentActorSpec.scala | 143 ++++++++-- .../java/doc/LambdaPersistenceDocTest.java | 111 +++++++- .../persistence/PersistentActorExample.java | 4 +- 19 files changed, 905 insertions(+), 159 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/persistence/Common.scala create mode 100644 akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala rename akka-bench-jmh/src/main/scala/akka/persistence/{PersistentActorThroughputBenchmark.scala => PersistentActorBenchmark.scala} (93%) create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala b/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala new file mode 100644 index 0000000000..391aff526d --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import akka.actor.Actor + +/** only as a "the best we could possibly get" baseline, does not persist anything */ +class BaselineActor(respondAfter: Int) extends Actor { + override def receive = { + case n: Int => if (n == respondAfter) sender() ! n + } +} + +final case class Evt(i: Int) diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala new file mode 100644 index 0000000000..788c848e7a --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit.TestProbe +import java.io.File +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations.Scope + +/* + # OS: OSX 10.9.3 + # CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz + # Date: Mon Jun 9 13:22:42 CEST 2014 + + [info] Benchmark Mode Samples Mean Mean error Units + [info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_reply thrpt 10 6.858 0.515 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_replyASAP thrpt 10 20.256 2.941 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_reply thrpt 10 6.531 0.114 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_replyASAP thrpt 10 26.000 0.694 ops/ms + */ +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +class PersistentActorDeferBenchmark { + + val config = PersistenceSpec.config("leveldb", "benchmark") + + lazy val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + var system: ActorSystem = _ + + var probe: TestProbe = _ + var processor: ActorRef = _ + var processor_replyASAP: ActorRef = _ + var persistAsync_defer: ActorRef = _ + var persistAsync_defer_replyASAP: ActorRef = _ + + val data10k = (1 to 10000).toArray + + @Setup + def setup() { + system = ActorSystem("test", config) + + probe = TestProbe()(system) + + storageLocations.foreach(FileUtils.deleteDirectory) + processor = system.actorOf(Props(classOf[`processor, forward Persistent, like defer`], data10k.last), "p-1") + processor_replyASAP = system.actorOf(Props(classOf[`processor, forward Persistent, reply ASAP`], data10k.last), "p-2") + persistAsync_defer = system.actorOf(Props(classOf[`persistAsync, defer`], data10k.last), "a-1") + persistAsync_defer_replyASAP = system.actorOf(Props(classOf[`persistAsync, defer, respond ASAP`], data10k.last), "a-2") + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + + storageLocations.foreach(FileUtils.deleteDirectory) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_processor_Persistent_reply() { + for (i <- data10k) processor.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_processor_Persistent_replyASAP() { + for (i <- data10k) processor_replyASAP.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_defer_persistAsync_reply() { + for (i <- data10k) persistAsync_defer.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_defer_persistAsync_replyASAP() { + for (i <- data10k) persistAsync_defer_replyASAP.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + +} + +class `processor, forward Persistent, like defer`(respondAfter: Int) extends Processor { + def receive = { + case n: Int => + self forward Persistent(Evt(n)) + self forward Evt(n) + case Persistent(p) => // ignore + case Evt(n) if n == respondAfter => sender() ! respondAfter + } +} +class `processor, forward Persistent, reply ASAP`(respondAfter: Int) extends Processor { + def receive = { + case n: Int => + self forward Persistent(Evt(n)) + if (n == respondAfter) sender() ! respondAfter + case _ => // ignore + } +} + +class `persistAsync, defer`(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => } + defer(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i } + } + override def receiveRecover = { + case _ => // do nothing + } +} +class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => } + defer(Evt(n)) { e => } + if (n == respondAfter) sender() ! n + } + override def receiveRecover = { + case _ => // do nothing + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala similarity index 93% rename from akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala rename to akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala index 71804d6ede..39f6702b71 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala @@ -61,7 +61,7 @@ class PersistentActorThroughputBenchmark { def tell_normalActor_reply_baseline() { for (i <- data10k) actor.tell(i, probe.ref) - probe.expectMsg(Evt(data10k.last)) + probe.expectMsg(data10k.last) } @GenerateMicroBenchmark @@ -95,9 +95,9 @@ class PersistentActorThroughputBenchmark { probe.expectMsg(Evt(data10k.last)) } + } -final case class Evt(i: Int) class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor { override def receiveCommand = { @@ -135,10 +135,3 @@ class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends Per case _ => // do nothing } } - -/** only as a "the best we could possibly get" baseline, does not persist anything */ -class BaselineActor(respondAfter: Int) extends Actor { - override def receive = { - case n: Int => if (n == respondAfter) sender() ! Evt(n) - } -} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index c18ed62c07..4f70f830fc 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -319,9 +319,9 @@ public class PersistenceDocTest { final ActorRef processor = system.actorOf(Props.create(MyProcessor.class)); public void batchWrite() { - processor.tell(PersistentBatch.create(asList( - Persistent.create("a"), - Persistent.create("b"))), null); + processor.tell(PersistentBatch.create(asList( + Persistent.create("a"), + Persistent.create("b"))), null); } // ... @@ -423,7 +423,7 @@ public class PersistenceDocTest { public void usage() { final ActorSystem system = ActorSystem.create("example"); - //#view-update + //#persist-async-usage final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); processor.tell("a", null); processor.tell("b", null); @@ -435,11 +435,56 @@ public class PersistenceDocTest { // evt-a-2 // evt-b-1 // evt-b-2 - //#view-update + //#persist-async-usage } }; static Object o10 = new Object() { + //#defer + class MyPersistentActor extends UntypedPersistentActor { + + @Override + public void onReceiveRecover(Object msg) { + // handle recovery here + } + + @Override + public void onReceiveCommand(Object msg) { + final Procedure replyToSender = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }; + + persistAsync(String.format("evt-%s-1", msg), replyToSender); + persistAsync(String.format("evt-%s-2", msg), replyToSender); + defer(String.format("evt-%s-3", msg), replyToSender); + } + } + //#defer + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#defer-caller + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", null); + processor.tell("b", null); + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + //#defer-caller + } + }; + + static Object o11 = new Object() { //#view class MyView extends UntypedView { @Override diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 04caf62fcc..4745b763e1 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -40,9 +40,14 @@ Akka persistence is a separate jar file. Make sure that you have the following d Architecture ============ -* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal - before its behavior is called. When a processor is started or restarted, journaled messages are replayed - to that processor, so that it can recover internal state from these messages. +* *Processor* (deprecated, use *PersistentActor* instead): A processor is a persistent, stateful actor. Messages sent + to a processor are written to a journal before its ``onReceive`` method is called. When a processor is started or + restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages. + +* *PersistentActor*: Is a persistent, stateful actor. It is able to persist events to a journal and can react to + them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors. + When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can + recover internal state from these messages. * *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's @@ -427,6 +432,11 @@ use the ``deleteSnapshots`` method. Event sourcing ============== +.. note:: + The ``PersistentActor`` introduced in this section was previously known as ``EventsourcedProcessor``, + which was a subset of the ``PersistentActor``. Migrating your code to use persistent actors instead is + very simple and is explained in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. + In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command @@ -484,6 +494,50 @@ It contains instructions on how to run the ``PersistentActorExample``. recovery you need to take special care to perform the same state transitions with ``become`` and ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. +Relaxed local consistency requirements and high throughput use-cases +-------------------------------------------------------------------- + +If faced with Relaxed local consistency requirements and high throughput demands sometimes ``PersistentActor`` and it's +``persist`` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all +Events related to a given Command are processed in order to start processing the next Command. While this abstraction is +very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may +want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in +the background and retroactively reacting to persistence failures if needed. + +The ``persistAsync`` method provides a tool for implementing high-throughput processors. It will *not* +stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks. + +In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. +The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.). + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persist-async + +Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. + +.. note:: + In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away, + and handle them in the callback. + +.. _defer-java-lambda: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller Reliable event delivery ----------------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 77c437cfb9..804b1b1aae 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -546,10 +546,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. includecode:: code/docs/persistence/PersistenceDocTest.java#persist-async +Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. + .. note:: In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away, and handle them in the callback. +.. _defer-java: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer-caller + + Reliable event delivery ----------------------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index c054e40e0a..7f6d5d614f 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -23,7 +23,7 @@ trait PersistenceDocSpec { trait SomeOtherMessage - val system: ActorSystem + implicit val system: ActorSystem import system._ @@ -33,11 +33,11 @@ trait PersistenceDocSpec { class MyProcessor extends Processor { def receive = { - case Persistent(payload, sequenceNr) => + case Persistent(payload, sequenceNr) => // message successfully written to journal case PersistenceFailure(payload, sequenceNr, cause) => // message failed to be written to journal - case m: SomeOtherMessage => + case m: SomeOtherMessage => // message not written to journal } } @@ -367,6 +367,45 @@ trait PersistenceDocSpec { //#persist-async } + new AnyRef { + import akka.actor.ActorRef + + val processor = system.actorOf(Props[MyPersistentActor]()) + + //#defer + class MyPersistentActor extends PersistentActor { + + def receiveRecover: Receive = { + case _ => // handle recovery here + } + + def receiveCommand: Receive = { + case c: String => { + sender() ! c + persistAsync(s"evt-$c-1") { e => sender() ! e } + persistAsync(s"evt-$c-2") { e => sender() ! e } + defer(s"evt-$c-3") { e => sender() ! e } + } + } + } + //#defer + + //#defer-caller + processor ! "a" + processor ! "b" + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + + //#defer-caller + } new AnyRef { import akka.actor.Props @@ -385,4 +424,47 @@ trait PersistenceDocSpec { view ! Update(await = true) //#view-update } + + new AnyRef { + // ------------------------------------------------------------------------------------------------ + // FIXME: uncomment once going back to project dependencies (in akka-stream-experimental) + // ------------------------------------------------------------------------------------------------ + /* + //#producer-creation + import org.reactivestreams.api.Producer + + import akka.persistence.Persistent + import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings } + import akka.stream.{ FlowMaterializer, MaterializerSettings } + import akka.stream.scaladsl.Flow + + val materializer = FlowMaterializer(MaterializerSettings()) + + val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id") + val producer: Producer[Persistent] = flow.toProducer(materializer) + //#producer-creation + + //#producer-buffer-size + PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200)) + //#producer-buffer-size + + //#producer-examples + // 1 producer and 2 consumers: + val producer1: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-1").toProducer(materializer) + Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer) + Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer) + + // 2 producers (merged) and 1 consumer: + val producer2: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-2").toProducer(materializer) + val producer3: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-3").toProducer(materializer) + Flow(producer2).merge(producer3).foreach { p => + println(s"consumer-3: ${p.payload}") + }.consume(materializer) + //#producer-examples + */ + } + } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 3325bc4b2d..2fa7805d86 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -260,7 +260,7 @@ request instructs a channel to send a ``Persistent`` message to a destination. preserved by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request. .. note:: - + Sending via a channel has at-least-once delivery semantics—by virtue of either the sending actor or the channel being persistent—which means that the semantics do not match those of a normal :class:`ActorRef` send operation: @@ -558,14 +558,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persist-async Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" -semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where this decision -was made by the sender. - +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. .. note:: In order to implement the "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming messages right away, and handle them in the callback. +.. _defer-scala: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +The calling side will get the responses in this (guaranteed) order: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer-caller + Reliable event delivery ----------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 77324901d3..ac8a05b658 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -10,7 +10,7 @@ import scala.collection.immutable import akka.japi.{ Procedure, Util } import akka.persistence.JournalProtocol._ -import akka.actor.AbstractActor +import akka.actor.{ ActorRef, AbstractActor } /** * INTERNAL API. @@ -58,8 +58,15 @@ private[persistence] trait Eventsourced extends Processor { throw new UnsupportedOperationException("Persistent command batches not supported") case _: PersistentRepr ⇒ throw new UnsupportedOperationException("Persistent commands not supported") - case WriteMessageSuccess(p) ⇒ - withCurrentPersistent(p)(p ⇒ persistInvocations.get(0).handler(p.payload)) + case WriteMessageSuccess(r) ⇒ + r match { + case p: PersistentRepr ⇒ + withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload)) + case _ ⇒ pendingInvocations.peek().handler(r.payload) + } + onWriteComplete() + case LoopMessageSuccess(l) ⇒ + pendingInvocations.peek().handler(l) onWriteComplete() case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) @@ -74,28 +81,13 @@ private[persistence] trait Eventsourced extends Processor { currentState = persistingEvents } - if (persistentEventBatch.nonEmpty) { - Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse)) - persistentEventBatch = Nil - } else { - processorStash.unstash() - } + if (resequenceableEventBatch.nonEmpty) flushBatch() + else processorStash.unstash() } private def onWriteComplete(): Unit = { - persistInvocations.remove(0) - - val nextIsStashing = !persistInvocations.isEmpty && persistInvocations.get(0).isInstanceOf[StashingPersistInvocation] - - if (nextIsStashing) { - currentState = persistingEvents - } - - if (persistInvocations.isEmpty) { - processorStash.unstash() - } + pendingInvocations.pop() } - } /** @@ -106,37 +98,48 @@ private[persistence] trait Eventsourced extends Processor { private val persistingEvents: State = new State { override def toString: String = "persisting events" - def aroundReceive(receive: Receive, message: Any) = message match { + def aroundReceive(receive: Receive, message: Any): Unit = message match { case _: ConfirmablePersistent ⇒ processorStash.stash() case PersistentBatch(b) ⇒ - b.foreach(p ⇒ deleteMessage(p.sequenceNr, permanent = true)) + b foreach { + case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, permanent = true) + case r ⇒ // ignore, nothing to delete (was not a persistent message) + } throw new UnsupportedOperationException("Persistent command batches not supported") case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, permanent = true) throw new UnsupportedOperationException("Persistent commands not supported") - case WriteMessageSuccess(p) ⇒ - val invocation = persistInvocations.get(0) - withCurrentPersistent(p)(p ⇒ invocation.handler(p.payload)) - onWriteComplete(invocation) + + case WriteMessageSuccess(m) ⇒ + m match { + case p: PersistentRepr ⇒ withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload)) + case _ ⇒ pendingInvocations.peek().handler(m.payload) + } + onWriteComplete() + case e @ WriteMessageFailure(p, _) ⇒ Eventsourced.super.aroundReceive(receive, message) // stops actor by default - onWriteComplete(persistInvocations.get(0)) + onWriteComplete() + case LoopMessageSuccess(l) ⇒ + pendingInvocations.peek().handler(l) + onWriteComplete() case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) case other ⇒ processorStash.stash() } - private def onWriteComplete(invocation: PersistInvocation): Unit = { - if (invocation.isInstanceOf[StashingPersistInvocation]) { - // enables an early return to `processingCommands`, because if this counter hits `0`, - // we know the remaining persistInvocations are all `persistAsync` created, which - // means we can go back to processing commands also - and these callbacks will be called as soon as possible - pendingStashingPersistInvocations -= 1 + private def onWriteComplete(): Unit = { + pendingInvocations.pop() match { + case _: StashingHandlerInvocation ⇒ + // enables an early return to `processingCommands`, because if this counter hits `0`, + // we know the remaining pendingInvocations are all `persistAsync` created, which + // means we can go back to processing commands also - and these callbacks will be called as soon as possible + pendingStashingPersistInvocations -= 1 + case _ ⇒ // do nothing } - persistInvocations.remove(0) - if (persistInvocations.isEmpty || pendingStashingPersistInvocations == 0) { + if (pendingStashingPersistInvocations == 0) { currentState = processingCommands processorStash.unstash() } @@ -161,23 +164,29 @@ private[persistence] trait Eventsourced extends Processor { receiveRecover(RecoveryCompleted) } - sealed trait PersistInvocation { + sealed trait PendingHandlerInvocation { + def evt: Any def handler: Any ⇒ Unit } /** forces processor to stash incoming commands untill all these invocations are handled */ - final case class StashingPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation - /** does not force the processor to stash commands */ - final case class AsyncPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation + final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */ + final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** Used instead of iterating `persistInvocations` in order to check if safe to revert to processing commands */ + /** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */ private var pendingStashingPersistInvocations: Long = 0 /** Holds user-supplied callbacks for persist/persistAsync calls */ - private val persistInvocations = new java.util.LinkedList[PersistInvocation]() // we only append / isEmpty / get(0) on it - private var persistentEventBatch: List[PersistentRepr] = Nil + private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it + private var resequenceableEventBatch: List[Resequenceable] = Nil private var currentState: State = recovering private val processorStash = createStash() + private def flushBatch() { + Eventsourced.super.aroundReceive(receive, PersistentBatch(resequenceableEventBatch.reverse)) + resequenceableEventBatch = Nil + } + /** * Asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. It is guaranteed that no new commands will be received by a processor @@ -202,8 +211,8 @@ private[persistence] trait Eventsourced extends Processor { */ final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { pendingStashingPersistInvocations += 1 - persistInvocations addLast StashingPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - persistentEventBatch = PersistentRepr(event) :: persistentEventBatch + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch } /** @@ -237,8 +246,8 @@ private[persistence] trait Eventsourced extends Processor { * @param handler handler for each persisted `event` */ final def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { - persistInvocations addLast AsyncPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - persistentEventBatch = PersistentRepr(event) :: persistentEventBatch + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch } /** @@ -252,6 +261,56 @@ private[persistence] trait Eventsourced extends Processor { final def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = events.foreach(persistAsync(_)(handler)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: A ⇒ Unit): Unit = { + if (pendingInvocations.isEmpty) { + handler(event) + } else { + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = NonPersistentRepr(event, sender()) :: resequenceableEventBatch + } + } + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + events.foreach(defer(_)(handler)) + /** * Recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -425,6 +484,50 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events final def persistAsync[A](events: JIterable[A])(handler: A ⇒ Unit): Unit = super[Eventsourced].persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: Procedure[A]): Unit = + super[Eventsourced].defer(event)(event ⇒ handler(event)) + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = + super[Eventsourced].defer(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -446,7 +549,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ def onReceiveCommand(msg: Any): Unit @@ -458,7 +561,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ @deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") @@ -516,6 +619,50 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou final def persistAsync[A](event: A, handler: Procedure[A]): Unit = persistAsync(event)(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: Procedure[A]): Unit = + super.defer(event)(event ⇒ handler(event)) + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = + super.defer(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Java API: asynchronously persists `events` in specified order. This is equivalent to calling * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 030554cfd2..af60f07348 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -8,8 +8,6 @@ import scala.collection.immutable import akka.actor._ -import akka.persistence.serialization.Message - /** * INTERNAL API. * @@ -60,7 +58,7 @@ private[persistence] object JournalProtocol { * @param messages messages to be written. * @param processor write requestor. */ - final case class WriteMessages(messages: immutable.Seq[PersistentRepr], processor: ActorRef) + final case class WriteMessages(messages: immutable.Seq[Resequenceable], processor: ActorRef) /** * Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 22cdb268b4..2c957c4156 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -15,12 +15,21 @@ import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message /** - * Persistent message. + * Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]]. + * + * In essence it is either an [[NonPersistentRepr]] or [[Persistent]]. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") -sealed abstract class Persistent { +sealed trait Resequenceable { + def payload: Any + def sender: ActorRef +} + +/** Message which can be resequenced by the Journal, but will not be persisted. */ +final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable + +/** Persistent message. */ +@deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4") +sealed abstract class Persistent extends Resequenceable { /** * This persistent message's payload. */ @@ -41,9 +50,7 @@ sealed abstract class Persistent { def withPayload(payload: Any): Persistent } -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") object Persistent { /** * Java API: creates a new persistent message. Must only be used outside processors. @@ -71,9 +78,7 @@ object Persistent { * @param payload payload of the new persistent message. * @param currentPersistentMessage optional current persistent message, defaults to `None`. */ - @deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") + @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent = currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload)) @@ -88,9 +93,7 @@ object Persistent { * Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel * destinations that receive messages of this type can confirm their receipt by calling [[confirm]]. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") sealed abstract class ConfirmablePersistent extends Persistent { /** * Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a @@ -105,9 +108,7 @@ sealed abstract class ConfirmablePersistent extends Persistent { def redeliveries: Int } -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") object ConfirmablePersistent { /** * [[ConfirmablePersistent]] extractor. @@ -121,18 +122,7 @@ object ConfirmablePersistent { * journal. The processor receives the written messages individually as [[Persistent]] messages. * During recovery, they are also replayed individually. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") -case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message { - // todo while we want to remove Persistent() from user-land, the batch may (probably?) become private[akka] to remain for journal internals #15230 - - /** - * INTERNAL API. - */ - private[persistence] def persistentReprList: List[PersistentRepr] = - persistentBatch.toList.asInstanceOf[List[PersistentRepr]] -} +case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message /** * Plugin API: confirmation entry written by journal plugins. @@ -170,7 +160,7 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ * @see [[journal.AsyncWriteJournal]] * @see [[journal.AsyncRecovery]] */ -trait PersistentRepr extends Persistent with PersistentId with Message { +trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message { // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 import scala.collection.JavaConverters._ diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 4f8feb4f57..dd1e3f980b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -86,10 +86,11 @@ trait Processor extends Actor with Recovery { private var batching = false def aroundReceive(receive: Receive, message: Any) = message match { - case r: Recover ⇒ // ignore - case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash - case WriteMessageSuccess(p) ⇒ processPersistent(receive, p) - case WriteMessageFailure(p, cause) ⇒ + case r: Recover ⇒ // ignore + case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash + case WriteMessageSuccess(p: PersistentRepr) ⇒ processPersistent(receive, p) + case WriteMessageSuccess(r: Resequenceable) ⇒ process(receive, r) + case WriteMessageFailure(p, cause) ⇒ process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) case LoopMessageSuccess(m) ⇒ process(receive, m) case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ @@ -108,11 +109,15 @@ trait Processor extends Actor with Recovery { journal forward LoopMessage(m, self) } - def addToBatch(p: PersistentRepr): Unit = - processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + def addToBatch(p: Resequenceable): Unit = p match { + case p: PersistentRepr ⇒ + processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + case r ⇒ + processorBatch = processorBatch :+ r + } def addToBatch(pb: PersistentBatch): Unit = - pb.persistentReprList.foreach(addToBatch) + pb.batch.foreach(addToBatch) def maxBatchSizeReached: Boolean = processorBatch.length >= extension.settings.journal.maxMessageBatchSize @@ -151,10 +156,10 @@ trait Processor extends Actor with Recovery { */ private def onRecoveryCompleted(receive: Receive): Unit = receive.applyOrElse(RecoveryCompleted, unhandled) - + private val _processorId = extension.processorId(self) - private var processorBatch = Vector.empty[PersistentRepr] + private var processorBatch = Vector.empty[Resequenceable] private var sequenceNr: Long = 0L /** @@ -326,14 +331,14 @@ trait Processor extends Actor with Recovery { * @param cause failure cause. */ @SerialVersionUID(1L) -final case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) +case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) /** * Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's * highest sequence number. If not handled, the prossor will be stopped. */ @SerialVersionUID(1L) -final case class RecoveryFailure(cause: Throwable) +case class RecoveryFailure(cause: Throwable) abstract class RecoveryCompleted /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index fc1a9daccf..7cf0681e8c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -16,7 +16,7 @@ import akka.persistence._ /** * Abstract journal, optimized for asynchronous, non-blocking writes. */ -trait AsyncWriteJournal extends Actor with AsyncRecovery { +trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { import JournalProtocol._ import AsyncWriteJournal._ import context.dispatcher @@ -24,16 +24,17 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands - private val resequencer = context.actorOf(Props[Resequencer]) + private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L def receive = { - case WriteMessages(persistentBatch, processor) ⇒ + case WriteMessages(resequenceables, processor) ⇒ val cctr = resequencerCounter - def resequence(f: PersistentRepr ⇒ Any) = persistentBatch.zipWithIndex.foreach { - case (p, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender) + def resequence(f: PersistentRepr ⇒ Any) = resequenceables.zipWithIndex.foreach { + case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender) + case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload), cctr + i + 1, processor, r.sender) } - asyncWriteMessages(persistentBatch.map(_.prepareWrite())) onComplete { + asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete { case Success(_) ⇒ resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self) resequence(WriteMessageSuccess(_)) @@ -41,7 +42,7 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self) resequence(WriteMessageFailure(_, e)) } - resequencerCounter += persistentBatch.length + 1 + resequencerCounter += resequenceables.length + 1 case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 4ddd433a37..0d43b9e393 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -15,7 +15,7 @@ import akka.persistence._ /** * Abstract journal, optimized for synchronous writes. */ -trait SyncWriteJournal extends Actor with AsyncRecovery { +trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { import JournalProtocol._ import context.dispatcher @@ -23,14 +23,20 @@ trait SyncWriteJournal extends Actor with AsyncRecovery { private val publish = extension.settings.internal.publishPluginCommands final def receive = { - case WriteMessages(persistentBatch, processor) ⇒ - Try(writeMessages(persistentBatch.map(_.prepareWrite()))) match { + case WriteMessages(resequenceables, processor) ⇒ + Try(writeMessages(preparePersistentBatch(resequenceables))) match { case Success(_) ⇒ processor ! WriteMessagesSuccessful - persistentBatch.foreach(p ⇒ processor.tell(WriteMessageSuccess(p), p.sender)) + resequenceables.foreach { + case p: PersistentRepr ⇒ processor.tell(WriteMessageSuccess(p), p.sender) + case r ⇒ processor.tell(LoopMessageSuccess(r.payload), r.sender) + } case Failure(e) ⇒ processor ! WriteMessagesFailed(e) - persistentBatch.foreach(p ⇒ processor tell (WriteMessageFailure(p, e), p.sender)) + resequenceables.foreach { + case p: PersistentRepr ⇒ processor tell (WriteMessageFailure(p, e), p.sender) + case r ⇒ processor tell (LoopMessageSuccess(r.payload), r.sender) + } throw e } case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala new file mode 100644 index 0000000000..79d5e685c6 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ + +package akka.persistence.journal + +import akka.persistence.{ PersistentRepr, Resequenceable } +import akka.actor.Actor +import scala.collection.immutable + +private[akka] trait WriteJournalBase { + this: Actor ⇒ + + protected def preparePersistentBatch(rb: immutable.Seq[Resequenceable]): immutable.Seq[PersistentRepr] = + rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations + + private def persistentPrepareWrite(r: Resequenceable): Boolean = r match { + case p: PersistentRepr ⇒ + p.prepareWrite(); true + case _ ⇒ + false + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index a2be6b30d5..5b50b7e367 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -86,7 +86,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = { val builder = PersistentMessageBatch.newBuilder - persistentBatch.persistentReprList.foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p))) + persistentBatch.batch. + filter(_.isInstanceOf[PersistentRepr]). + foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p.asInstanceOf[PersistentRepr]))) builder } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 179bd46f30..542bcb26b0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -18,7 +18,7 @@ object PersistentActorSpec { final case class Cmd(data: Any) final case class Evt(data: Any) - abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with PersistentActor { + abstract class ExamplePersistentActor(name: String) extends NamedProcessor(name) with PersistentActor { var events: List[Any] = Nil val updateState: Receive = { @@ -33,14 +33,14 @@ object PersistentActorSpec { def receiveRecover = updateState } - class Behavior1Processor(name: String) extends ExampleProcessor(name) { + class Behavior1Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) } } - class Behavior2Processor(name: String) extends ExampleProcessor(name) { + class Behavior2Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) @@ -48,7 +48,7 @@ object PersistentActorSpec { } } - class Behavior3Processor(name: String) extends ExampleProcessor(name) { + class Behavior3Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState) @@ -56,7 +56,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Evt(s"${data}-21"))(updateState) @@ -75,7 +75,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Evt(s"${data}-21")) { event ⇒ @@ -94,7 +94,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ context.unbecome() @@ -109,7 +109,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) @@ -124,7 +124,7 @@ object PersistentActorSpec { } } - class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExampleProcessor(name) { + class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { override def receiveRecover = super.receiveRecover orElse { case SnapshotOffer(_, events: List[_]) ⇒ probe ! "offered" @@ -160,14 +160,14 @@ object PersistentActorSpec { } } - class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ReplyInEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender() ! evt.data) case p: Persistent ⇒ sender() ! p // not expected } } - class UserStashProcessor(name: String) extends ExampleProcessor(name) { + class UserStashProcessor(name: String) extends ExamplePersistentActor(name) { var stashed = false val receiveCommand: Receive = { case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender() ! "a" @@ -176,7 +176,7 @@ object PersistentActorSpec { } } - class UserStashManyProcessor(name: String) extends ExampleProcessor(name) { + class UserStashManyProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd("a") ⇒ persist(Evt("a")) { evt ⇒ updateState(evt) @@ -196,7 +196,7 @@ object PersistentActorSpec { case other ⇒ stash() } } - class AsyncPersistProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 val receiveCommand: Receive = commonBehavior orElse { @@ -212,7 +212,7 @@ object PersistentActorSpec { counter } } - class AsyncPersistThreeTimesProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistThreeTimesProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 val receiveCommand: Receive = commonBehavior orElse { @@ -231,7 +231,7 @@ object PersistentActorSpec { counter } } - class AsyncPersistSameEventTwiceProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistSameEventTwiceProcessor(name: String) extends ExamplePersistentActor(name) { // atomic because used from inside the *async* callbacks val sendMsgCounter = new AtomicInteger() @@ -249,7 +249,7 @@ object PersistentActorSpec { persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" } } } - class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -276,12 +276,10 @@ object PersistentActorSpec { counter } } - class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExamplePersistentActor(name) { var sendMsgCounter = 0 - val start = System.currentTimeMillis() - def time = s" ${System.currentTimeMillis() - start}ms" val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ sender() ! data @@ -301,7 +299,7 @@ object PersistentActorSpec { } } - class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) { + class UserStashFailureProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ if (data == "b-2") throw new TestException("boom") @@ -322,7 +320,7 @@ object PersistentActorSpec { } } - class AnyValEventProcessor(name: String) extends ExampleProcessor(name) { + class AnyValEventProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(5)(evt ⇒ sender() ! evt) } @@ -348,6 +346,44 @@ object PersistentActorSpec { } } + class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer("d-1") { sender() ! _ } + persist(s"$data-2") { sender() ! _ } + defer("d-3") { sender() ! _ } + defer("d-4") { sender() ! _ } + } + } + class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer(s"d-$data-1") { sender() ! _ } + persistAsync(s"pa-$data-2") { sender() ! _ } + defer(s"d-$data-3") { sender() ! _ } + defer(s"d-$data-4") { sender() ! _ } + } + } + class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + persist(s"p-$data-1") { sender() ! _ } + persistAsync(s"pa-$data-2") { sender() ! _ } + defer(s"d-$data-3") { sender() ! _ } + defer(s"d-$data-4") { sender() ! _ } + persistAsync(s"pa-$data-5") { sender() ! _ } + defer(s"d-$data-6") { sender() ! _ } + } + } + class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer("d-1") { sender() ! _ } + defer("d-2") { sender() ! _ } + defer("d-3") { sender() ! _ } + } + } + } abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { @@ -625,6 +661,71 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with expectNoMsg(100.millis) } + "allow deferring handlers in order to provide ordered processing in respect to persist handlers" in { + val processor = namedProcessor[DeferringWithPersistActor] + processor ! Cmd("a") + expectMsg("d-1") + expectMsg("a-2") + expectMsg("d-3") + expectMsg("d-4") + expectNoMsg(100.millis) + } + "allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in { + val processor = namedProcessor[DeferringWithAsyncPersistActor] + processor ! Cmd("a") + expectMsg("d-a-1") + expectMsg("pa-a-2") + expectMsg("d-a-3") + expectMsg("d-a-4") + expectNoMsg(100.millis) + } + "invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in { + val processor = namedProcessor[DeferringMixedCallsPPADDPADPersistActor] + val p1, p2 = TestProbe() + + processor.tell(Cmd("a"), p1.ref) + processor.tell(Cmd("b"), p2.ref) + p1.expectMsg("p-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") + p1.expectMsg("pa-a-5") + p1.expectMsg("d-a-6") + + p2.expectMsg("p-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + p2.expectMsg("pa-b-5") + p2.expectMsg("d-b-6") + + expectNoMsg(100.millis) + } + "invoke deferred handlers right away, if there are no pending persist handlers registered" in { + val processor = namedProcessor[DeferringWithNoPersistCallsPersistActor] + processor ! Cmd("a") + expectMsg("d-1") + expectMsg("d-2") + expectMsg("d-3") + expectNoMsg(100.millis) + } + "invoke deferred handlers, perserving the original sender references" in { + val processor = namedProcessor[DeferringWithAsyncPersistActor] + val p1, p2 = TestProbe() + + processor.tell(Cmd("a"), p1.ref) + processor.tell(Cmd("b"), p2.ref) + p1.expectMsg("d-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") + + p2.expectMsg("d-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + expectNoMsg(100.millis) + } "receive RecoveryFinished if it is handled after all events have been replayed" in { val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) processor1 ! Cmd("b") diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index c924b2238c..a943427f2f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -4,17 +4,19 @@ package doc; -import java.util.concurrent.TimeUnit; - +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; +import akka.persistence.*; import scala.Option; import scala.PartialFunction; import scala.concurrent.duration.Duration; - -import akka.actor.*; -import akka.persistence.*; import scala.runtime.BoxedUnit; +import java.util.concurrent.TimeUnit; + import static java.util.Arrays.asList; public class LambdaPersistenceDocTest { @@ -359,7 +361,7 @@ public class LambdaPersistenceDocTest { static Object o8 = new Object() { //#reliable-event-delivery - class MyEventsourcedProcessor extends AbstractEventsourcedProcessor { + class MyEventsourcedProcessor extends AbstractPersistentActor { private ActorRef destination; private ActorRef channel; @@ -392,6 +394,103 @@ public class LambdaPersistenceDocTest { }; static Object o9 = new Object() { + //#persist-async + class MyPersistentActor extends AbstractPersistentActor { + + private void handleCommand(String c) { + sender().tell(c, self()); + + persistAsync(String.format("evt-%s-1", c), e -> { + sender().tell(e, self()); + }); + persistAsync(String.format("evt-%s-2", c), e -> { + sender().tell(e, self()); + }); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + } + //#persist-async + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#persist-async-usage + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", null); + processor.tell("b", null); + + // possible order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-b-1 + // evt-b-2 + //#persist-async-usage + } + }; + + + static Object o10 = new Object() { + //#defer + class MyPersistentActor extends AbstractPersistentActor { + + private void handleCommand(String c) { + persistAsync(String.format("evt-%s-1", c), e -> { + sender().tell(e, self()); + }); + persistAsync(String.format("evt-%s-2", c), e -> { + sender().tell(e, self()); + }); + + defer(String.format("evt-%s-3", c), e -> { + sender().tell(e, self()); + }); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + } + //#defer + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef sender = null; // your imaginary sender here + //#defer-caller + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", sender); + processor.tell("b", sender); + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + //#defer-caller + } + }; + + + static Object o11 = new Object() { //#view class MyView extends AbstractView { @Override diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index 81302af637..a527504a46 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -48,7 +48,7 @@ class ExampleState implements Serializable { private final ArrayList events; public ExampleState() { - this(new ArrayList()); + this(new ArrayList<>()); } public ExampleState(ArrayList events) { @@ -56,7 +56,7 @@ class ExampleState implements Serializable { } public ExampleState copy() { - return new ExampleState(new ArrayList(events)); + return new ExampleState(new ArrayList<>(events)); } public void update(Evt evt) {