diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala b/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala new file mode 100644 index 0000000000..391aff526d --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/Common.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import akka.actor.Actor + +/** only as a "the best we could possibly get" baseline, does not persist anything */ +class BaselineActor(respondAfter: Int) extends Actor { + override def receive = { + case n: Int => if (n == respondAfter) sender() ! n + } +} + +final case class Evt(i: Int) diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala new file mode 100644 index 0000000000..788c848e7a --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit.TestProbe +import java.io.File +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations.Scope + +/* + # OS: OSX 10.9.3 + # CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz + # Date: Mon Jun 9 13:22:42 CEST 2014 + + [info] Benchmark Mode Samples Mean Mean error Units + [info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_reply thrpt 10 6.858 0.515 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_replyASAP thrpt 10 20.256 2.941 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_reply thrpt 10 6.531 0.114 ops/ms + [info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_replyASAP thrpt 10 26.000 0.694 ops/ms + */ +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +class PersistentActorDeferBenchmark { + + val config = PersistenceSpec.config("leveldb", "benchmark") + + lazy val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + var system: ActorSystem = _ + + var probe: TestProbe = _ + var processor: ActorRef = _ + var processor_replyASAP: ActorRef = _ + var persistAsync_defer: ActorRef = _ + var persistAsync_defer_replyASAP: ActorRef = _ + + val data10k = (1 to 10000).toArray + + @Setup + def setup() { + system = ActorSystem("test", config) + + probe = TestProbe()(system) + + storageLocations.foreach(FileUtils.deleteDirectory) + processor = system.actorOf(Props(classOf[`processor, forward Persistent, like defer`], data10k.last), "p-1") + processor_replyASAP = system.actorOf(Props(classOf[`processor, forward Persistent, reply ASAP`], data10k.last), "p-2") + persistAsync_defer = system.actorOf(Props(classOf[`persistAsync, defer`], data10k.last), "a-1") + persistAsync_defer_replyASAP = system.actorOf(Props(classOf[`persistAsync, defer, respond ASAP`], data10k.last), "a-2") + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + + storageLocations.foreach(FileUtils.deleteDirectory) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_processor_Persistent_reply() { + for (i <- data10k) processor.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_processor_Persistent_replyASAP() { + for (i <- data10k) processor_replyASAP.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_defer_persistAsync_reply() { + for (i <- data10k) persistAsync_defer.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_defer_persistAsync_replyASAP() { + for (i <- data10k) persistAsync_defer_replyASAP.tell(i, probe.ref) + + probe.expectMsg(data10k.last) + } + +} + +class `processor, forward Persistent, like defer`(respondAfter: Int) extends Processor { + def receive = { + case n: Int => + self forward Persistent(Evt(n)) + self forward Evt(n) + case Persistent(p) => // ignore + case Evt(n) if n == respondAfter => sender() ! respondAfter + } +} +class `processor, forward Persistent, reply ASAP`(respondAfter: Int) extends Processor { + def receive = { + case n: Int => + self forward Persistent(Evt(n)) + if (n == respondAfter) sender() ! respondAfter + case _ => // ignore + } +} + +class `persistAsync, defer`(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => } + defer(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i } + } + override def receiveRecover = { + case _ => // do nothing + } +} +class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => } + defer(Evt(n)) { e => } + if (n == respondAfter) sender() ! n + } + override def receiveRecover = { + case _ => // do nothing + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala similarity index 93% rename from akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala rename to akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala index 71804d6ede..39f6702b71 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala @@ -61,7 +61,7 @@ class PersistentActorThroughputBenchmark { def tell_normalActor_reply_baseline() { for (i <- data10k) actor.tell(i, probe.ref) - probe.expectMsg(Evt(data10k.last)) + probe.expectMsg(data10k.last) } @GenerateMicroBenchmark @@ -95,9 +95,9 @@ class PersistentActorThroughputBenchmark { probe.expectMsg(Evt(data10k.last)) } + } -final case class Evt(i: Int) class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor { override def receiveCommand = { @@ -135,10 +135,3 @@ class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends Per case _ => // do nothing } } - -/** only as a "the best we could possibly get" baseline, does not persist anything */ -class BaselineActor(respondAfter: Int) extends Actor { - override def receive = { - case n: Int => if (n == respondAfter) sender() ! Evt(n) - } -} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index c18ed62c07..4f70f830fc 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -319,9 +319,9 @@ public class PersistenceDocTest { final ActorRef processor = system.actorOf(Props.create(MyProcessor.class)); public void batchWrite() { - processor.tell(PersistentBatch.create(asList( - Persistent.create("a"), - Persistent.create("b"))), null); + processor.tell(PersistentBatch.create(asList( + Persistent.create("a"), + Persistent.create("b"))), null); } // ... @@ -423,7 +423,7 @@ public class PersistenceDocTest { public void usage() { final ActorSystem system = ActorSystem.create("example"); - //#view-update + //#persist-async-usage final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); processor.tell("a", null); processor.tell("b", null); @@ -435,11 +435,56 @@ public class PersistenceDocTest { // evt-a-2 // evt-b-1 // evt-b-2 - //#view-update + //#persist-async-usage } }; static Object o10 = new Object() { + //#defer + class MyPersistentActor extends UntypedPersistentActor { + + @Override + public void onReceiveRecover(Object msg) { + // handle recovery here + } + + @Override + public void onReceiveCommand(Object msg) { + final Procedure replyToSender = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }; + + persistAsync(String.format("evt-%s-1", msg), replyToSender); + persistAsync(String.format("evt-%s-2", msg), replyToSender); + defer(String.format("evt-%s-3", msg), replyToSender); + } + } + //#defer + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#defer-caller + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", null); + processor.tell("b", null); + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + //#defer-caller + } + }; + + static Object o11 = new Object() { //#view class MyView extends UntypedView { @Override diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 04caf62fcc..4745b763e1 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -40,9 +40,14 @@ Akka persistence is a separate jar file. Make sure that you have the following d Architecture ============ -* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal - before its behavior is called. When a processor is started or restarted, journaled messages are replayed - to that processor, so that it can recover internal state from these messages. +* *Processor* (deprecated, use *PersistentActor* instead): A processor is a persistent, stateful actor. Messages sent + to a processor are written to a journal before its ``onReceive`` method is called. When a processor is started or + restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages. + +* *PersistentActor*: Is a persistent, stateful actor. It is able to persist events to a journal and can react to + them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors. + When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can + recover internal state from these messages. * *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's @@ -427,6 +432,11 @@ use the ``deleteSnapshots`` method. Event sourcing ============== +.. note:: + The ``PersistentActor`` introduced in this section was previously known as ``EventsourcedProcessor``, + which was a subset of the ``PersistentActor``. Migrating your code to use persistent actors instead is + very simple and is explained in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. + In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command @@ -484,6 +494,50 @@ It contains instructions on how to run the ``PersistentActorExample``. recovery you need to take special care to perform the same state transitions with ``become`` and ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. +Relaxed local consistency requirements and high throughput use-cases +-------------------------------------------------------------------- + +If faced with Relaxed local consistency requirements and high throughput demands sometimes ``PersistentActor`` and it's +``persist`` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all +Events related to a given Command are processed in order to start processing the next Command. While this abstraction is +very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may +want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in +the background and retroactively reacting to persistence failures if needed. + +The ``persistAsync`` method provides a tool for implementing high-throughput processors. It will *not* +stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks. + +In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. +The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.). + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persist-async + +Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. + +.. note:: + In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away, + and handle them in the callback. + +.. _defer-java-lambda: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller Reliable event delivery ----------------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 77c437cfb9..804b1b1aae 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -546,10 +546,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. includecode:: code/docs/persistence/PersistenceDocTest.java#persist-async +Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. + .. note:: In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away, and handle them in the callback. +.. _defer-java: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer-caller + + Reliable event delivery ----------------------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index c054e40e0a..7f6d5d614f 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -23,7 +23,7 @@ trait PersistenceDocSpec { trait SomeOtherMessage - val system: ActorSystem + implicit val system: ActorSystem import system._ @@ -33,11 +33,11 @@ trait PersistenceDocSpec { class MyProcessor extends Processor { def receive = { - case Persistent(payload, sequenceNr) => + case Persistent(payload, sequenceNr) => // message successfully written to journal case PersistenceFailure(payload, sequenceNr, cause) => // message failed to be written to journal - case m: SomeOtherMessage => + case m: SomeOtherMessage => // message not written to journal } } @@ -367,6 +367,45 @@ trait PersistenceDocSpec { //#persist-async } + new AnyRef { + import akka.actor.ActorRef + + val processor = system.actorOf(Props[MyPersistentActor]()) + + //#defer + class MyPersistentActor extends PersistentActor { + + def receiveRecover: Receive = { + case _ => // handle recovery here + } + + def receiveCommand: Receive = { + case c: String => { + sender() ! c + persistAsync(s"evt-$c-1") { e => sender() ! e } + persistAsync(s"evt-$c-2") { e => sender() ! e } + defer(s"evt-$c-3") { e => sender() ! e } + } + } + } + //#defer + + //#defer-caller + processor ! "a" + processor ! "b" + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + + //#defer-caller + } new AnyRef { import akka.actor.Props @@ -385,4 +424,47 @@ trait PersistenceDocSpec { view ! Update(await = true) //#view-update } + + new AnyRef { + // ------------------------------------------------------------------------------------------------ + // FIXME: uncomment once going back to project dependencies (in akka-stream-experimental) + // ------------------------------------------------------------------------------------------------ + /* + //#producer-creation + import org.reactivestreams.api.Producer + + import akka.persistence.Persistent + import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings } + import akka.stream.{ FlowMaterializer, MaterializerSettings } + import akka.stream.scaladsl.Flow + + val materializer = FlowMaterializer(MaterializerSettings()) + + val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id") + val producer: Producer[Persistent] = flow.toProducer(materializer) + //#producer-creation + + //#producer-buffer-size + PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200)) + //#producer-buffer-size + + //#producer-examples + // 1 producer and 2 consumers: + val producer1: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-1").toProducer(materializer) + Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer) + Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer) + + // 2 producers (merged) and 1 consumer: + val producer2: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-2").toProducer(materializer) + val producer3: Producer[Persistent] = + PersistentFlow.fromProcessor("processor-3").toProducer(materializer) + Flow(producer2).merge(producer3).foreach { p => + println(s"consumer-3: ${p.payload}") + }.consume(materializer) + //#producer-examples + */ + } + } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 3325bc4b2d..2fa7805d86 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -260,7 +260,7 @@ request instructs a channel to send a ``Persistent`` message to a destination. preserved by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request. .. note:: - + Sending via a channel has at-least-once delivery semantics—by virtue of either the sending actor or the channel being persistent—which means that the semantics do not match those of a normal :class:`ActorRef` send operation: @@ -558,14 +558,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persist-async Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" -semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where this decision -was made by the sender. - +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision. .. note:: In order to implement the "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming messages right away, and handle them in the callback. +.. _defer-scala: + +Deferring actions until preceeding persist handlers have executed +----------------------------------------------------------------- + +Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of +''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method +called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to +use it for *read* operations, and actions which do not have corresponding events in your domain model. + +Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +It will be kept in memory and used when invoking the handler. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer + +Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender +of the command for which this ``defer`` handler was called. + +The calling side will get the responses in this (guaranteed) order: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer-caller + Reliable event delivery ----------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 77324901d3..ac8a05b658 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -10,7 +10,7 @@ import scala.collection.immutable import akka.japi.{ Procedure, Util } import akka.persistence.JournalProtocol._ -import akka.actor.AbstractActor +import akka.actor.{ ActorRef, AbstractActor } /** * INTERNAL API. @@ -58,8 +58,15 @@ private[persistence] trait Eventsourced extends Processor { throw new UnsupportedOperationException("Persistent command batches not supported") case _: PersistentRepr ⇒ throw new UnsupportedOperationException("Persistent commands not supported") - case WriteMessageSuccess(p) ⇒ - withCurrentPersistent(p)(p ⇒ persistInvocations.get(0).handler(p.payload)) + case WriteMessageSuccess(r) ⇒ + r match { + case p: PersistentRepr ⇒ + withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload)) + case _ ⇒ pendingInvocations.peek().handler(r.payload) + } + onWriteComplete() + case LoopMessageSuccess(l) ⇒ + pendingInvocations.peek().handler(l) onWriteComplete() case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) @@ -74,28 +81,13 @@ private[persistence] trait Eventsourced extends Processor { currentState = persistingEvents } - if (persistentEventBatch.nonEmpty) { - Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse)) - persistentEventBatch = Nil - } else { - processorStash.unstash() - } + if (resequenceableEventBatch.nonEmpty) flushBatch() + else processorStash.unstash() } private def onWriteComplete(): Unit = { - persistInvocations.remove(0) - - val nextIsStashing = !persistInvocations.isEmpty && persistInvocations.get(0).isInstanceOf[StashingPersistInvocation] - - if (nextIsStashing) { - currentState = persistingEvents - } - - if (persistInvocations.isEmpty) { - processorStash.unstash() - } + pendingInvocations.pop() } - } /** @@ -106,37 +98,48 @@ private[persistence] trait Eventsourced extends Processor { private val persistingEvents: State = new State { override def toString: String = "persisting events" - def aroundReceive(receive: Receive, message: Any) = message match { + def aroundReceive(receive: Receive, message: Any): Unit = message match { case _: ConfirmablePersistent ⇒ processorStash.stash() case PersistentBatch(b) ⇒ - b.foreach(p ⇒ deleteMessage(p.sequenceNr, permanent = true)) + b foreach { + case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, permanent = true) + case r ⇒ // ignore, nothing to delete (was not a persistent message) + } throw new UnsupportedOperationException("Persistent command batches not supported") case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, permanent = true) throw new UnsupportedOperationException("Persistent commands not supported") - case WriteMessageSuccess(p) ⇒ - val invocation = persistInvocations.get(0) - withCurrentPersistent(p)(p ⇒ invocation.handler(p.payload)) - onWriteComplete(invocation) + + case WriteMessageSuccess(m) ⇒ + m match { + case p: PersistentRepr ⇒ withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload)) + case _ ⇒ pendingInvocations.peek().handler(m.payload) + } + onWriteComplete() + case e @ WriteMessageFailure(p, _) ⇒ Eventsourced.super.aroundReceive(receive, message) // stops actor by default - onWriteComplete(persistInvocations.get(0)) + onWriteComplete() + case LoopMessageSuccess(l) ⇒ + pendingInvocations.peek().handler(l) + onWriteComplete() case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) case other ⇒ processorStash.stash() } - private def onWriteComplete(invocation: PersistInvocation): Unit = { - if (invocation.isInstanceOf[StashingPersistInvocation]) { - // enables an early return to `processingCommands`, because if this counter hits `0`, - // we know the remaining persistInvocations are all `persistAsync` created, which - // means we can go back to processing commands also - and these callbacks will be called as soon as possible - pendingStashingPersistInvocations -= 1 + private def onWriteComplete(): Unit = { + pendingInvocations.pop() match { + case _: StashingHandlerInvocation ⇒ + // enables an early return to `processingCommands`, because if this counter hits `0`, + // we know the remaining pendingInvocations are all `persistAsync` created, which + // means we can go back to processing commands also - and these callbacks will be called as soon as possible + pendingStashingPersistInvocations -= 1 + case _ ⇒ // do nothing } - persistInvocations.remove(0) - if (persistInvocations.isEmpty || pendingStashingPersistInvocations == 0) { + if (pendingStashingPersistInvocations == 0) { currentState = processingCommands processorStash.unstash() } @@ -161,23 +164,29 @@ private[persistence] trait Eventsourced extends Processor { receiveRecover(RecoveryCompleted) } - sealed trait PersistInvocation { + sealed trait PendingHandlerInvocation { + def evt: Any def handler: Any ⇒ Unit } /** forces processor to stash incoming commands untill all these invocations are handled */ - final case class StashingPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation - /** does not force the processor to stash commands */ - final case class AsyncPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation + final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + /** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */ + final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation - /** Used instead of iterating `persistInvocations` in order to check if safe to revert to processing commands */ + /** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */ private var pendingStashingPersistInvocations: Long = 0 /** Holds user-supplied callbacks for persist/persistAsync calls */ - private val persistInvocations = new java.util.LinkedList[PersistInvocation]() // we only append / isEmpty / get(0) on it - private var persistentEventBatch: List[PersistentRepr] = Nil + private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it + private var resequenceableEventBatch: List[Resequenceable] = Nil private var currentState: State = recovering private val processorStash = createStash() + private def flushBatch() { + Eventsourced.super.aroundReceive(receive, PersistentBatch(resequenceableEventBatch.reverse)) + resequenceableEventBatch = Nil + } + /** * Asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. It is guaranteed that no new commands will be received by a processor @@ -202,8 +211,8 @@ private[persistence] trait Eventsourced extends Processor { */ final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { pendingStashingPersistInvocations += 1 - persistInvocations addLast StashingPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - persistentEventBatch = PersistentRepr(event) :: persistentEventBatch + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch } /** @@ -237,8 +246,8 @@ private[persistence] trait Eventsourced extends Processor { * @param handler handler for each persisted `event` */ final def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { - persistInvocations addLast AsyncPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - persistentEventBatch = PersistentRepr(event) :: persistentEventBatch + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch } /** @@ -252,6 +261,56 @@ private[persistence] trait Eventsourced extends Processor { final def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = events.foreach(persistAsync(_)(handler)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: A ⇒ Unit): Unit = { + if (pendingInvocations.isEmpty) { + handler(event) + } else { + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + resequenceableEventBatch = NonPersistentRepr(event, sender()) :: resequenceableEventBatch + } + } + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + events.foreach(defer(_)(handler)) + /** * Recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -425,6 +484,50 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events final def persistAsync[A](events: JIterable[A])(handler: A ⇒ Unit): Unit = super[Eventsourced].persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: Procedure[A]): Unit = + super[Eventsourced].defer(event)(event ⇒ handler(event)) + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = + super[Eventsourced].defer(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -446,7 +549,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ def onReceiveCommand(msg: Any): Unit @@ -458,7 +561,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. * Commands sent to event sourced processors must not be [[Persistent]] or - * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ @deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") @@ -516,6 +619,50 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou final def persistAsync[A](event: A, handler: Procedure[A]): Unit = persistAsync(event)(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param event event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for the given `event` + */ + final def defer[A](event: A)(handler: Procedure[A]): Unit = + super.defer(event)(event ⇒ handler(event)) + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediatly. + * + * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the + * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. + * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * will not be run. + * + * @param events event to be handled in the future, when preceeding persist operations have been processes + * @param handler handler for each `event` + */ + final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = + super.defer(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Java API: asynchronously persists `events` in specified order. This is equivalent to calling * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 030554cfd2..af60f07348 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -8,8 +8,6 @@ import scala.collection.immutable import akka.actor._ -import akka.persistence.serialization.Message - /** * INTERNAL API. * @@ -60,7 +58,7 @@ private[persistence] object JournalProtocol { * @param messages messages to be written. * @param processor write requestor. */ - final case class WriteMessages(messages: immutable.Seq[PersistentRepr], processor: ActorRef) + final case class WriteMessages(messages: immutable.Seq[Resequenceable], processor: ActorRef) /** * Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 22cdb268b4..2c957c4156 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -15,12 +15,21 @@ import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message /** - * Persistent message. + * Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]]. + * + * In essence it is either an [[NonPersistentRepr]] or [[Persistent]]. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") -sealed abstract class Persistent { +sealed trait Resequenceable { + def payload: Any + def sender: ActorRef +} + +/** Message which can be resequenced by the Journal, but will not be persisted. */ +final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable + +/** Persistent message. */ +@deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4") +sealed abstract class Persistent extends Resequenceable { /** * This persistent message's payload. */ @@ -41,9 +50,7 @@ sealed abstract class Persistent { def withPayload(payload: Any): Persistent } -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") object Persistent { /** * Java API: creates a new persistent message. Must only be used outside processors. @@ -71,9 +78,7 @@ object Persistent { * @param payload payload of the new persistent message. * @param currentPersistentMessage optional current persistent message, defaults to `None`. */ - @deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") + @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent = currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload)) @@ -88,9 +93,7 @@ object Persistent { * Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel * destinations that receive messages of this type can confirm their receipt by calling [[confirm]]. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") sealed abstract class ConfirmablePersistent extends Persistent { /** * Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a @@ -105,9 +108,7 @@ sealed abstract class ConfirmablePersistent extends Persistent { def redeliveries: Int } -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") object ConfirmablePersistent { /** * [[ConfirmablePersistent]] extractor. @@ -121,18 +122,7 @@ object ConfirmablePersistent { * journal. The processor receives the written messages individually as [[Persistent]] messages. * During recovery, they are also replayed individually. */ -@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + - "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", - since = "2.3.4") -case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message { - // todo while we want to remove Persistent() from user-land, the batch may (probably?) become private[akka] to remain for journal internals #15230 - - /** - * INTERNAL API. - */ - private[persistence] def persistentReprList: List[PersistentRepr] = - persistentBatch.toList.asInstanceOf[List[PersistentRepr]] -} +case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message /** * Plugin API: confirmation entry written by journal plugins. @@ -170,7 +160,7 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ * @see [[journal.AsyncWriteJournal]] * @see [[journal.AsyncRecovery]] */ -trait PersistentRepr extends Persistent with PersistentId with Message { +trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message { // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 import scala.collection.JavaConverters._ diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 4f8feb4f57..dd1e3f980b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -86,10 +86,11 @@ trait Processor extends Actor with Recovery { private var batching = false def aroundReceive(receive: Receive, message: Any) = message match { - case r: Recover ⇒ // ignore - case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash - case WriteMessageSuccess(p) ⇒ processPersistent(receive, p) - case WriteMessageFailure(p, cause) ⇒ + case r: Recover ⇒ // ignore + case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash + case WriteMessageSuccess(p: PersistentRepr) ⇒ processPersistent(receive, p) + case WriteMessageSuccess(r: Resequenceable) ⇒ process(receive, r) + case WriteMessageFailure(p, cause) ⇒ process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) case LoopMessageSuccess(m) ⇒ process(receive, m) case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ @@ -108,11 +109,15 @@ trait Processor extends Actor with Recovery { journal forward LoopMessage(m, self) } - def addToBatch(p: PersistentRepr): Unit = - processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + def addToBatch(p: Resequenceable): Unit = p match { + case p: PersistentRepr ⇒ + processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + case r ⇒ + processorBatch = processorBatch :+ r + } def addToBatch(pb: PersistentBatch): Unit = - pb.persistentReprList.foreach(addToBatch) + pb.batch.foreach(addToBatch) def maxBatchSizeReached: Boolean = processorBatch.length >= extension.settings.journal.maxMessageBatchSize @@ -151,10 +156,10 @@ trait Processor extends Actor with Recovery { */ private def onRecoveryCompleted(receive: Receive): Unit = receive.applyOrElse(RecoveryCompleted, unhandled) - + private val _processorId = extension.processorId(self) - private var processorBatch = Vector.empty[PersistentRepr] + private var processorBatch = Vector.empty[Resequenceable] private var sequenceNr: Long = 0L /** @@ -326,14 +331,14 @@ trait Processor extends Actor with Recovery { * @param cause failure cause. */ @SerialVersionUID(1L) -final case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) +case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) /** * Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's * highest sequence number. If not handled, the prossor will be stopped. */ @SerialVersionUID(1L) -final case class RecoveryFailure(cause: Throwable) +case class RecoveryFailure(cause: Throwable) abstract class RecoveryCompleted /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index fc1a9daccf..7cf0681e8c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -16,7 +16,7 @@ import akka.persistence._ /** * Abstract journal, optimized for asynchronous, non-blocking writes. */ -trait AsyncWriteJournal extends Actor with AsyncRecovery { +trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { import JournalProtocol._ import AsyncWriteJournal._ import context.dispatcher @@ -24,16 +24,17 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands - private val resequencer = context.actorOf(Props[Resequencer]) + private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L def receive = { - case WriteMessages(persistentBatch, processor) ⇒ + case WriteMessages(resequenceables, processor) ⇒ val cctr = resequencerCounter - def resequence(f: PersistentRepr ⇒ Any) = persistentBatch.zipWithIndex.foreach { - case (p, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender) + def resequence(f: PersistentRepr ⇒ Any) = resequenceables.zipWithIndex.foreach { + case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender) + case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload), cctr + i + 1, processor, r.sender) } - asyncWriteMessages(persistentBatch.map(_.prepareWrite())) onComplete { + asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete { case Success(_) ⇒ resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self) resequence(WriteMessageSuccess(_)) @@ -41,7 +42,7 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self) resequence(WriteMessageFailure(_, e)) } - resequencerCounter += persistentBatch.length + 1 + resequencerCounter += resequenceables.length + 1 case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 4ddd433a37..0d43b9e393 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -15,7 +15,7 @@ import akka.persistence._ /** * Abstract journal, optimized for synchronous writes. */ -trait SyncWriteJournal extends Actor with AsyncRecovery { +trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { import JournalProtocol._ import context.dispatcher @@ -23,14 +23,20 @@ trait SyncWriteJournal extends Actor with AsyncRecovery { private val publish = extension.settings.internal.publishPluginCommands final def receive = { - case WriteMessages(persistentBatch, processor) ⇒ - Try(writeMessages(persistentBatch.map(_.prepareWrite()))) match { + case WriteMessages(resequenceables, processor) ⇒ + Try(writeMessages(preparePersistentBatch(resequenceables))) match { case Success(_) ⇒ processor ! WriteMessagesSuccessful - persistentBatch.foreach(p ⇒ processor.tell(WriteMessageSuccess(p), p.sender)) + resequenceables.foreach { + case p: PersistentRepr ⇒ processor.tell(WriteMessageSuccess(p), p.sender) + case r ⇒ processor.tell(LoopMessageSuccess(r.payload), r.sender) + } case Failure(e) ⇒ processor ! WriteMessagesFailed(e) - persistentBatch.foreach(p ⇒ processor tell (WriteMessageFailure(p, e), p.sender)) + resequenceables.foreach { + case p: PersistentRepr ⇒ processor tell (WriteMessageFailure(p, e), p.sender) + case r ⇒ processor tell (LoopMessageSuccess(r.payload), r.sender) + } throw e } case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala new file mode 100644 index 0000000000..79d5e685c6 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ + +package akka.persistence.journal + +import akka.persistence.{ PersistentRepr, Resequenceable } +import akka.actor.Actor +import scala.collection.immutable + +private[akka] trait WriteJournalBase { + this: Actor ⇒ + + protected def preparePersistentBatch(rb: immutable.Seq[Resequenceable]): immutable.Seq[PersistentRepr] = + rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations + + private def persistentPrepareWrite(r: Resequenceable): Boolean = r match { + case p: PersistentRepr ⇒ + p.prepareWrite(); true + case _ ⇒ + false + } + +} diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index a2be6b30d5..5b50b7e367 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -86,7 +86,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = { val builder = PersistentMessageBatch.newBuilder - persistentBatch.persistentReprList.foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p))) + persistentBatch.batch. + filter(_.isInstanceOf[PersistentRepr]). + foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p.asInstanceOf[PersistentRepr]))) builder } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 179bd46f30..542bcb26b0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -18,7 +18,7 @@ object PersistentActorSpec { final case class Cmd(data: Any) final case class Evt(data: Any) - abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with PersistentActor { + abstract class ExamplePersistentActor(name: String) extends NamedProcessor(name) with PersistentActor { var events: List[Any] = Nil val updateState: Receive = { @@ -33,14 +33,14 @@ object PersistentActorSpec { def receiveRecover = updateState } - class Behavior1Processor(name: String) extends ExampleProcessor(name) { + class Behavior1Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) } } - class Behavior2Processor(name: String) extends ExampleProcessor(name) { + class Behavior2Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) @@ -48,7 +48,7 @@ object PersistentActorSpec { } } - class Behavior3Processor(name: String) extends ExampleProcessor(name) { + class Behavior3Processor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState) @@ -56,7 +56,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Evt(s"${data}-21"))(updateState) @@ -75,7 +75,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Evt(s"${data}-21")) { event ⇒ @@ -94,7 +94,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ context.unbecome() @@ -109,7 +109,7 @@ object PersistentActorSpec { } } - class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) { + class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExamplePersistentActor(name) { val newBehavior: Receive = { case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) @@ -124,7 +124,7 @@ object PersistentActorSpec { } } - class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExampleProcessor(name) { + class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { override def receiveRecover = super.receiveRecover orElse { case SnapshotOffer(_, events: List[_]) ⇒ probe ! "offered" @@ -160,14 +160,14 @@ object PersistentActorSpec { } } - class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + class ReplyInEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender() ! evt.data) case p: Persistent ⇒ sender() ! p // not expected } } - class UserStashProcessor(name: String) extends ExampleProcessor(name) { + class UserStashProcessor(name: String) extends ExamplePersistentActor(name) { var stashed = false val receiveCommand: Receive = { case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender() ! "a" @@ -176,7 +176,7 @@ object PersistentActorSpec { } } - class UserStashManyProcessor(name: String) extends ExampleProcessor(name) { + class UserStashManyProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd("a") ⇒ persist(Evt("a")) { evt ⇒ updateState(evt) @@ -196,7 +196,7 @@ object PersistentActorSpec { case other ⇒ stash() } } - class AsyncPersistProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 val receiveCommand: Receive = commonBehavior orElse { @@ -212,7 +212,7 @@ object PersistentActorSpec { counter } } - class AsyncPersistThreeTimesProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistThreeTimesProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 val receiveCommand: Receive = commonBehavior orElse { @@ -231,7 +231,7 @@ object PersistentActorSpec { counter } } - class AsyncPersistSameEventTwiceProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistSameEventTwiceProcessor(name: String) extends ExamplePersistentActor(name) { // atomic because used from inside the *async* callbacks val sendMsgCounter = new AtomicInteger() @@ -249,7 +249,7 @@ object PersistentActorSpec { persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" } } } - class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -276,12 +276,10 @@ object PersistentActorSpec { counter } } - class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExampleProcessor(name) { + class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExamplePersistentActor(name) { var sendMsgCounter = 0 - val start = System.currentTimeMillis() - def time = s" ${System.currentTimeMillis() - start}ms" val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ sender() ! data @@ -301,7 +299,7 @@ object PersistentActorSpec { } } - class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) { + class UserStashFailureProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior orElse { case Cmd(data) ⇒ if (data == "b-2") throw new TestException("boom") @@ -322,7 +320,7 @@ object PersistentActorSpec { } } - class AnyValEventProcessor(name: String) extends ExampleProcessor(name) { + class AnyValEventProcessor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(5)(evt ⇒ sender() ! evt) } @@ -348,6 +346,44 @@ object PersistentActorSpec { } } + class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer("d-1") { sender() ! _ } + persist(s"$data-2") { sender() ! _ } + defer("d-3") { sender() ! _ } + defer("d-4") { sender() ! _ } + } + } + class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer(s"d-$data-1") { sender() ! _ } + persistAsync(s"pa-$data-2") { sender() ! _ } + defer(s"d-$data-3") { sender() ! _ } + defer(s"d-$data-4") { sender() ! _ } + } + } + class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + persist(s"p-$data-1") { sender() ! _ } + persistAsync(s"pa-$data-2") { sender() ! _ } + defer(s"d-$data-3") { sender() ! _ } + defer(s"d-$data-4") { sender() ! _ } + persistAsync(s"pa-$data-5") { sender() ! _ } + defer(s"d-$data-6") { sender() ! _ } + } + } + class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case Cmd(data) ⇒ + defer("d-1") { sender() ! _ } + defer("d-2") { sender() ! _ } + defer("d-3") { sender() ! _ } + } + } + } abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { @@ -625,6 +661,71 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with expectNoMsg(100.millis) } + "allow deferring handlers in order to provide ordered processing in respect to persist handlers" in { + val processor = namedProcessor[DeferringWithPersistActor] + processor ! Cmd("a") + expectMsg("d-1") + expectMsg("a-2") + expectMsg("d-3") + expectMsg("d-4") + expectNoMsg(100.millis) + } + "allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in { + val processor = namedProcessor[DeferringWithAsyncPersistActor] + processor ! Cmd("a") + expectMsg("d-a-1") + expectMsg("pa-a-2") + expectMsg("d-a-3") + expectMsg("d-a-4") + expectNoMsg(100.millis) + } + "invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in { + val processor = namedProcessor[DeferringMixedCallsPPADDPADPersistActor] + val p1, p2 = TestProbe() + + processor.tell(Cmd("a"), p1.ref) + processor.tell(Cmd("b"), p2.ref) + p1.expectMsg("p-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") + p1.expectMsg("pa-a-5") + p1.expectMsg("d-a-6") + + p2.expectMsg("p-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + p2.expectMsg("pa-b-5") + p2.expectMsg("d-b-6") + + expectNoMsg(100.millis) + } + "invoke deferred handlers right away, if there are no pending persist handlers registered" in { + val processor = namedProcessor[DeferringWithNoPersistCallsPersistActor] + processor ! Cmd("a") + expectMsg("d-1") + expectMsg("d-2") + expectMsg("d-3") + expectNoMsg(100.millis) + } + "invoke deferred handlers, perserving the original sender references" in { + val processor = namedProcessor[DeferringWithAsyncPersistActor] + val p1, p2 = TestProbe() + + processor.tell(Cmd("a"), p1.ref) + processor.tell(Cmd("b"), p2.ref) + p1.expectMsg("d-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") + + p2.expectMsg("d-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + expectNoMsg(100.millis) + } "receive RecoveryFinished if it is handled after all events have been replayed" in { val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) processor1 ! Cmd("b") diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index c924b2238c..a943427f2f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -4,17 +4,19 @@ package doc; -import java.util.concurrent.TimeUnit; - +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; +import akka.persistence.*; import scala.Option; import scala.PartialFunction; import scala.concurrent.duration.Duration; - -import akka.actor.*; -import akka.persistence.*; import scala.runtime.BoxedUnit; +import java.util.concurrent.TimeUnit; + import static java.util.Arrays.asList; public class LambdaPersistenceDocTest { @@ -359,7 +361,7 @@ public class LambdaPersistenceDocTest { static Object o8 = new Object() { //#reliable-event-delivery - class MyEventsourcedProcessor extends AbstractEventsourcedProcessor { + class MyEventsourcedProcessor extends AbstractPersistentActor { private ActorRef destination; private ActorRef channel; @@ -392,6 +394,103 @@ public class LambdaPersistenceDocTest { }; static Object o9 = new Object() { + //#persist-async + class MyPersistentActor extends AbstractPersistentActor { + + private void handleCommand(String c) { + sender().tell(c, self()); + + persistAsync(String.format("evt-%s-1", c), e -> { + sender().tell(e, self()); + }); + persistAsync(String.format("evt-%s-2", c), e -> { + sender().tell(e, self()); + }); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + } + //#persist-async + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#persist-async-usage + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", null); + processor.tell("b", null); + + // possible order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-b-1 + // evt-b-2 + //#persist-async-usage + } + }; + + + static Object o10 = new Object() { + //#defer + class MyPersistentActor extends AbstractPersistentActor { + + private void handleCommand(String c) { + persistAsync(String.format("evt-%s-1", c), e -> { + sender().tell(e, self()); + }); + persistAsync(String.format("evt-%s-2", c), e -> { + sender().tell(e, self()); + }); + + defer(String.format("evt-%s-3", c), e -> { + sender().tell(e, self()); + }); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, this::handleCommand).build(); + } + } + //#defer + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef sender = null; // your imaginary sender here + //#defer-caller + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", sender); + processor.tell("b", sender); + + // order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-a-3 + // evt-b-1 + // evt-b-2 + // evt-b-3 + //#defer-caller + } + }; + + + static Object o11 = new Object() { //#view class MyView extends AbstractView { @Override diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index 81302af637..a527504a46 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -48,7 +48,7 @@ class ExampleState implements Serializable { private final ArrayList events; public ExampleState() { - this(new ArrayList()); + this(new ArrayList<>()); } public ExampleState(ArrayList events) { @@ -56,7 +56,7 @@ class ExampleState implements Serializable { } public ExampleState copy() { - return new ExampleState(new ArrayList(events)); + return new ExampleState(new ArrayList<>(events)); } public void update(Evt evt) {