diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index c7bf8927c0..0711eaa96d 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -3,21 +3,21 @@ */ package akka.event -import language.existentials -import akka.actor._ -import akka.{ ConfigurationException, AkkaException } -import akka.actor.ActorSystem.Settings -import akka.util.ReentrantGuard -import java.util.concurrent.atomic.AtomicInteger +import java.util.Locale import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.ActorSystem.Settings +import akka.actor._ +import akka.dispatch.RequiresMessageQueue +import akka.util.ReentrantGuard +import akka.{ AkkaException, ConfigurationException } + import scala.annotation.implicitNotFound import scala.collection.immutable -import scala.concurrent.duration._ import scala.concurrent.Await -import scala.util.control.NoStackTrace -import scala.util.control.NonFatal -import java.util.Locale -import akka.dispatch.RequiresMessageQueue +import scala.language.existentials +import scala.util.control.{ NoStackTrace, NonFatal } /** * This trait brings log level handling to the EventStream: it reads the log @@ -1132,10 +1132,12 @@ class DefaultLoggingFilter(logLevel: () ⇒ Logging.LogLevel) extends LoggingFil */ trait DiagnosticLoggingAdapter extends LoggingAdapter { - import Logging._ - import scala.collection.JavaConverters._ import java.{ util ⇒ ju } + import Logging._ + + import scala.collection.JavaConverters._ + private var _mdc = emptyMDC /** diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 3ad450f5b5..5e458a534f 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -41,7 +41,7 @@ public class PersistenceDocTest { private void recover() { //#recover-explicit - persistentActor.tell(Recover.create(), getSelf()); + persistentActor.tell(Recover.create(), self()); //#recover-explicit } } @@ -64,7 +64,7 @@ public class PersistenceDocTest { //#recover-on-start-custom @Override public void preStart() { - getSelf().tell(Recover.create(457L), getSelf()); + self().tell(Recover.create(457L), self()); } //#recover-on-start-custom } @@ -129,7 +129,7 @@ public class PersistenceDocTest { abstract class MyPersistentActor1 extends UntypedPersistentActor { //#recover-fully-disabled @Override - public void preStart() { getSelf().tell(Recover.create(0L), getSelf()); } + public void preStart() { self().tell(Recover.create(0L), self()); } //#recover-fully-disabled } }; @@ -227,7 +227,7 @@ public class PersistenceDocTest { if (message instanceof Msg) { Msg msg = (Msg) message; // ... - getSender().tell(new Confirm(msg.deliveryId), getSelf()); + getSender().tell(new Confirm(msg.deliveryId), self()); } else { unhandled(message); } @@ -324,7 +324,7 @@ public class PersistenceDocTest { @Override public void onReceiveCommand(Object msg) { - sender().tell(msg, getSelf()); + sender().tell(msg, self()); persistAsync(String.format("evt-%s-1", msg), new Procedure(){ @Override @@ -376,7 +376,7 @@ public class PersistenceDocTest { final Procedure replyToSender = new Procedure() { @Override public void apply(String event) throws Exception { - sender().tell(event, getSelf()); + sender().tell(event, self()); } }; @@ -408,6 +408,140 @@ public class PersistenceDocTest { }; static Object o11 = new Object() { + + class MyPersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { + return "my-stable-persistence-id"; + } + + @Override + public void onReceiveRecover(Object msg) { + // handle recovery here + } + + //#nested-persist-persist + @Override + public void onReceiveCommand(Object msg) { + final Procedure replyToSender = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }; + + final Procedure outer1Callback = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + persist(String.format("%s-inner-1", msg), replyToSender); + } + }; + final Procedure outer2Callback = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + persist(String.format("%s-inner-2", msg), replyToSender); + } + }; + + persist(String.format("%s-outer-1", msg), outer1Callback); + persist(String.format("%s-outer-2", msg), outer2Callback); + } + //#nested-persist-persist + + void usage(ActorRef persistentActor) { + //#nested-persist-persist-caller + persistentActor.tell("a", self()); + persistentActor.tell("b", self()); + + // order of received messages: + // a + // a-outer-1 + // a-outer-2 + // a-inner-1 + // a-inner-2 + // and only then process "b" + // b + // b-outer-1 + // b-outer-2 + // b-inner-1 + // b-inner-2 + + //#nested-persist-persist-caller + } + } + + + class MyPersistAsyncActor extends UntypedPersistentActor { + @Override + public String persistenceId() { + return "my-stable-persistence-id"; + } + + @Override + public void onReceiveRecover(Object msg) { + // handle recovery here + } + + //#nested-persistAsync-persistAsync + @Override + public void onReceiveCommand(Object msg) { + final Procedure replyToSender = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }; + + final Procedure outer1Callback = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + persistAsync(String.format("%s-inner-1", msg), replyToSender); + } + }; + final Procedure outer2Callback = new Procedure() { + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + persistAsync(String.format("%s-inner-1", msg), replyToSender); + } + }; + + persistAsync(String.format("%s-outer-1", msg), outer1Callback); + persistAsync(String.format("%s-outer-2", msg), outer2Callback); + } + //#nested-persistAsync-persistAsync + + + void usage(ActorRef persistentActor) { + //#nested-persistAsync-persistAsync-caller + persistentActor.tell("a", ActorRef.noSender()); + persistentActor.tell("b", ActorRef.noSender()); + + // order of received messages: + // a + // b + // a-outer-1 + // a-outer-2 + // b-outer-1 + // b-outer-2 + // a-inner-1 + // a-inner-2 + // b-inner-1 + // b-inner-2 + + // which can be seen as the following causal relationship: + // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2 + // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2 + + //#nested-persistAsync-persistAsync-caller + } + } + }; + + static Object o12 = new Object() { //#view class MyView extends UntypedPersistentView { @Override diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index af84bc61a8..225e2f0b6e 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -249,7 +249,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. _defer-java-lambda: Deferring actions until preceding persist handlers have executed ------------------------------------------------------------------ +---------------------------------------------------------------- Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of ''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method @@ -270,6 +270,41 @@ of the command for which this ``defer`` handler was called. The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +.. _nested-persist-calls-lambda: + +Nested persist calls +-------------------- +It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly +retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees. + +In general it is encouraged to create command handlers which do not need to resort to nested event persisting, +however there are situations where it may be useful. It is important to understand the ordering of callback execution in +those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following +example two persist calls are issued, and each of them issues another persist inside its callback: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist + +When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist-caller + +First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor. +In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer +is extended until all nested ``persist`` callbacks have been handled. + +It is also possible to nest ``persistAsync`` calls, using the same pattern: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync + +In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller + +While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics +it is not a recommended practice as it may lead to overly complex nesting. + Failures -------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index d7710d8604..fe90b78751 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -273,6 +273,41 @@ of the command for which this ``defer`` handler was called. The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +.. _nested-persist-calls-java: + +Nested persist calls +-------------------- +It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly +retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees. + +In general it is encouraged to create command handlers which do not need to resort to nested event persisting, +however there are situations where it may be useful. It is important to understand the ordering of callback execution in +those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following +example two persist calls are issued, and each of them issues another persist inside its callback: + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist + +When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order: + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist-caller + +First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor. +In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer +is extended until all nested ``persist`` callbacks have been handled. + +It is also possible to nest ``persistAsync`` calls, using the same pattern: + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persistAsync-persistAsync + +In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order: + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persistAsync-persistAsync-caller + +While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics +it is not a recommended practice as it may lead to overly complex nesting. + Failures -------- diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 47a2dca14c..aac0379da3 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -280,6 +280,103 @@ object PersistenceDocSpec { //#defer-caller } + object NestedPersists { + + class MyPersistentActor extends PersistentActor { + override def persistenceId = "my-stable-persistence-id" + + override def receiveRecover: Receive = { + case _ => // handle recovery here + } + + //#nested-persist-persist + override def receiveCommand: Receive = { + case c: String => + sender() ! c + + persist(s"$c-1-outer") { outer1 => + sender() ! outer1 + persist(s"$c-1-inner") { inner1 => + sender() ! inner1 + } + } + + persist(s"$c-2-outer") { outer2 => + sender() ! outer2 + persist(s"$c-2-inner") { inner2 => + sender() ! inner2 + } + } + } + //#nested-persist-persist + } + + //#nested-persist-persist-caller + persistentActor ! "a" + persistentActor ! "b" + + // order of received messages: + // a + // a-outer-1 + // a-outer-2 + // a-inner-1 + // a-inner-2 + // and only then process "b" + // b + // b-outer-1 + // b-outer-2 + // b-inner-1 + // b-inner-2 + + //#nested-persist-persist-caller + + + class MyPersistAsyncActor extends PersistentActor { + override def persistenceId = "my-stable-persistence-id" + + override def receiveRecover: Receive = { + case _ => // handle recovery here + } + + //#nested-persistAsync-persistAsync + override def receiveCommand: Receive = { + case c: String => + sender() ! c + persistAsync(c + "-outer-1") { outer ⇒ + sender() ! outer + persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner } + } + persistAsync(c + "-outer-2") { outer ⇒ + sender() ! outer + persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner } + } + } + //#nested-persistAsync-persistAsync + } + + //#nested-persistAsync-persistAsync-caller + persistentActor ! "a" + persistentActor ! "b" + + // order of received messages: + // a + // b + // a-outer-1 + // a-outer-2 + // b-outer-1 + // b-outer-2 + // a-inner-1 + // a-inner-2 + // b-inner-1 + // b-inner-2 + + // which can be seen as the following causal relationship: + // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2 + // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2 + + //#nested-persistAsync-persistAsync-caller + } + object View { import akka.actor.Props diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index d19bab7a91..0be4b701da 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -236,7 +236,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e .. _defer-scala: Deferring actions until preceding persist handlers have executed ------------------------------------------------------------------ +---------------------------------------------------------------- Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of ''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method @@ -259,6 +259,41 @@ The calling side will get the responses in this (guaranteed) order: The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +.. _nested-persist-calls-scala: + +Nested persist calls +-------------------- +It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly +retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees. + +In general it is encouraged to create command handlers which do not need to resort to nested event persisting, +however there are situations where it may be useful. It is important to understand the ordering of callback execution in +those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following +example two persist calls are issued, and each of them issues another persist inside its callback: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist + +When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist-caller + +First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor. +In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer +is extended until all nested ``persist`` callbacks have been handled. + +It is also possible to nest ``persistAsync`` calls, using the same pattern: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persistAsync-persistAsync + +In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persistAsync-persistAsync-caller + +While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics +it is not a recommended practice as it may lead to overly complex nesting. + Failures -------- diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 91d873ffda..96e1e16034 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -522,6 +522,44 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } + private def flushBatch() { + def addToBatch(p: PersistentEnvelope): Unit = p match { + case a: AtomicWrite ⇒ + journalBatch :+= a.copy(payload = + a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid))) + case r: PersistentEnvelope ⇒ + journalBatch :+= r + } + + def maxBatchSizeReached: Boolean = + journalBatch.size >= maxMessageBatchSize + + // When using only `persistAsync` and `defer` max throughput is increased by using + // batching, but when using `persist` we want to use one atomic WriteMessages + // for the emitted events. + // Flush previously collected events, if any, separately from the `persist` batch + if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty) + flushJournalBatch() + + eventBatch.reverse.foreach { p ⇒ + addToBatch(p) + if (!writeInProgress || maxBatchSizeReached) flushJournalBatch() + } + + eventBatch = Nil + } + + private def peekApplyHandler(payload: Any): Unit = { + val batchSizeBeforeApply = eventBatch.size + try pendingInvocations.peek().handler(payload) + finally { + val batchSizeAfterApply = eventBatch.size + + if (batchSizeAfterApply > batchSizeBeforeApply) + flushBatch() + } + } + /** * Common receive handler for processingCommands and persistingEvents */ @@ -533,7 +571,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas if (id == instanceId) { updateLastSequenceNr(p) try { - pendingInvocations.peek().handler(p.payload) + peekApplyHandler(p.payload) onWriteMessageComplete(err = false) } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } @@ -598,22 +636,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas internalStash.unstash() } - private def flushBatch() { - // When using only `persistAsync` and `defer` max throughput is increased by using - // batching, but when using `persist` we want to use one atomic WriteMessages - // for the emitted events. - // Flush previously collected events, if any, separately from the `persist` batch - if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty) - flushJournalBatch() - - eventBatch.reverse.foreach { p ⇒ - addToBatch(p) - if (!writeInProgress || maxBatchSizeReached) flushJournalBatch() - } - - eventBatch = Nil - } - private def addToBatch(p: PersistentEnvelope): Unit = p match { case a: AtomicWrite ⇒ journalBatch :+= a.copy(payload = diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 69dccdf7be..e10d4ddf40 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -7,6 +7,9 @@ package akka.persistence import java.io.File import java.util.concurrent.atomic.AtomicInteger +import org.scalatest.matchers.{ MatchResult, Matcher } + +import scala.collection.immutable import scala.reflect.ClassTag import scala.util.control.NoStackTrace @@ -18,7 +21,8 @@ import org.scalatest.BeforeAndAfterEach import akka.actor.Props import akka.testkit.AkkaSpec -abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ +abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup + with PersistenceMatchers { this: AkkaSpec ⇒ private var _name: String = _ lazy val extension = Persistence(system) @@ -87,3 +91,28 @@ trait TurnOffRecoverOnStart { this: Eventsourced ⇒ class TestException(msg: String) extends Exception(msg) with NoStackTrace case object GetState + +/** Additional ScalaTest matchers useful in persistence tests */ +trait PersistenceMatchers { + /** Use this matcher to verify in-order execution of independent "streams" of events */ + final class IndependentlyOrdered(prefixes: immutable.Seq[String]) extends Matcher[immutable.Seq[Any]] { + override def apply(_left: immutable.Seq[Any]) = { + val left = _left.map(_.toString) + val mapped = left.groupBy(l ⇒ prefixes.indexWhere(p ⇒ l.startsWith(p))) - (-1) // ignore other messages + val results = for { + (pos, seq) ← mapped + nrs = seq.map(_.replaceFirst(prefixes(pos), "").toInt) + sortedNrs = nrs.sorted + if nrs != sortedNrs + } yield MatchResult( + false, + s"""Messages sequence with prefix ${prefixes(pos)} was not sorted! Was: $seq"""", + s"""Messages sequence with prefix ${prefixes(pos)} was sorted! Was: $seq"""") + + if (results.forall(_.matches)) MatchResult(true, "", "") + else results.find(r ⇒ !r.matches).get + } + } + + def beIndependentlyOrdered(prefixes: String*) = new IndependentlyOrdered(prefixes.toList) +} \ No newline at end of file diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 945591afa7..1e9f4161f4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -9,11 +9,13 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe } import com.typesafe.config.Config +import org.scalatest.matchers.{ MatchResult, Matcher } import scala.collection.immutable.Seq -import scala.concurrent.Await +import scala.collection.immutable +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ -import scala.util.Random +import scala.util.{ Try, Random } import scala.util.control.NoStackTrace object PersistentActorSpec { @@ -424,6 +426,130 @@ object PersistentActorSpec { } } + class MultipleAndNestedPersists(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persist(s + "-outer-1") { outer ⇒ + probe ! outer + persist(s + "-inner-1") { inner ⇒ probe ! inner } + } + persist(s + "-outer-2") { outer ⇒ + probe ! outer + persist(s + "-inner-2") { inner ⇒ probe ! inner } + } + } + } + class MultipleAndNestedPersistAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persistAsync(s + "-outer-1") { outer ⇒ + probe ! outer + persistAsync(s + "-inner-1") { inner ⇒ probe ! inner } + } + persistAsync(s + "-outer-2") { outer ⇒ + probe ! outer + persistAsync(s + "-inner-2") { inner ⇒ probe ! inner } + } + } + } + class DeeplyNestedPersistAsyncs(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) { + var currentDepths = Map.empty[String, Int].withDefaultValue(1) + + def weMustGoDeeper: String ⇒ Unit = { dWithDepth ⇒ + val d = dWithDepth.split("-").head + probe ! dWithDepth + if (currentDepths(d) < maxDepth) { + currentDepths = currentDepths.updated(d, currentDepths(d) + 1) + persistAsync(d + "-" + currentDepths(d))(weMustGoDeeper) + } else { + // reset depth counter before next command + currentDepths = currentDepths.updated(d, 1) + } + } + + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persistAsync(s + "-" + 1)(weMustGoDeeper) + } + } + + class NestedPersistNormalAndAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persist(s + "-outer-1") { outer ⇒ + probe ! outer + persistAsync(s + "-inner-async-1") { inner ⇒ + probe ! inner + } + } + persist(s + "-outer-2") { outer ⇒ + probe ! outer + persistAsync(s + "-inner-async-2") { inner ⇒ + probe ! inner + } + } + } + } + class NestedPersistAsyncsAndNormal(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persistAsync(s + "-outer-async-1") { outer ⇒ + probe ! outer + persist(s + "-inner-1") { inner ⇒ + probe ! inner + } + } + persistAsync(s + "-outer-async-2") { outer ⇒ + probe ! outer + persist(s + "-inner-2") { inner ⇒ + probe ! inner + } + } + } + } + class NestedPersistInAsyncEnforcesStashing(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persistAsync(s + "-outer-async") { outer ⇒ + probe ! outer + persist(s + "-inner") { inner ⇒ + probe ! inner + Thread.sleep(1000) // really long wait here... + // the next incoming command must be handled by the following function + context.become({ case _ ⇒ sender() ! "done" }) + } + } + } + } + + class DeeplyNestedPersists(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) { + var currentDepths = Map.empty[String, Int].withDefaultValue(1) + + def weMustGoDeeper: String ⇒ Unit = { dWithDepth ⇒ + val d = dWithDepth.split("-").head + probe ! dWithDepth + if (currentDepths(d) < maxDepth) { + currentDepths = currentDepths.updated(d, currentDepths(d) + 1) + persist(d + "-" + currentDepths(d))(weMustGoDeeper) + } else { + // reset depth counter before next command + currentDepths = currentDepths.updated(d, 1) + } + } + + val receiveCommand: Receive = { + case s: String ⇒ + probe ! s + persist(s + "-" + 1)(weMustGoDeeper) + } + } + class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor { override def persistenceId: String = "StackableTestPersistentActor" @@ -891,6 +1017,88 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } + "allow multiple persists with nested persist calls" in { + val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersists], name, testActor)) + persistentActor ! "a" + persistentActor ! "b" + + expectMsg("a") + expectMsg("a-outer-1") + expectMsg("a-outer-2") + expectMsg("a-inner-1") + expectMsg("a-inner-2") + // and only then process "b" + expectMsg("b") + expectMsg("b-outer-1") + expectMsg("b-outer-2") + expectMsg("b-inner-1") + expectMsg("b-inner-2") + } + "allow multiple persistAsyncs with nested persistAsync calls" in { + val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncs], name, testActor)) + persistentActor ! "a" + persistentActor ! "b" + + val msgs = receiveN(10).map(_.toString) + val as = msgs.filter(_ startsWith "a") + val bs = msgs.filter(_ startsWith "b") + as should equal(List("a", "a-outer-1", "a-outer-2", "a-inner-1", "a-inner-2")) + bs should equal(List("b", "b-outer-1", "b-outer-2", "b-inner-1", "b-inner-2")) + } + "allow deeply nested persist calls" in { + val nestedPersists = 6 + + val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersists], name, nestedPersists, testActor)) + persistentActor ! "a" + persistentActor ! "b" + + expectMsg("a") + receiveN(6) should ===((1 to nestedPersists).map("a-" + _)) + // and only then process "b" + expectMsg("b") + receiveN(6) should ===((1 to nestedPersists).map("b-" + _)) + } + "allow deeply nested persistAsync calls" in { + val nestedPersistAsyncs = 6 + + val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncs], name, nestedPersistAsyncs, testActor)) + + persistentActor ! "a" + expectMsg("a") + val got = receiveN(nestedPersistAsyncs) + got should beIndependentlyOrdered("a-") + + persistentActor ! "b" + persistentActor ! "c" + val expectedReplies = 2 + (nestedPersistAsyncs * 2) + receiveN(expectedReplies).map(_.toString) should beIndependentlyOrdered("b-", "c-") + } + "allow mixed nesting of persistAsync in persist calls" in { + val persistentActor = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncs], name, testActor)) + persistentActor ! "a" + + expectMsg("a") + receiveN(4) should equal(List("a-outer-1", "a-outer-2", "a-inner-async-1", "a-inner-async-2")) + } + "allow mixed nesting of persist in persistAsync calls" in { + val persistentActor = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormal], name, testActor)) + persistentActor ! "a" + + expectMsg("a") + receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2")) + } + "make sure persist retains promised semantics when nested in persistAsync callback xoxo" in { + val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor)) + persistentActor ! "a" + + expectMsg("a") + expectMsg("a-outer-async") + expectMsg("a-inner") + persistentActor ! "b" + expectMsg("done") + // which means that b only got applied after the inner persist() handler finished + // so it keeps the persist() semantics, even though we should not recommend this style it can come in handy I guess + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 65b6c2caba..3c40fbfa4e 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -9,6 +9,7 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.japi.Procedure; import akka.japi.pf.ReceiveBuilder; import akka.persistence.*; import scala.Option; @@ -34,11 +35,9 @@ public class LambdaPersistenceDocTest { static Object o1 = new Object() { - private void recover() { - ActorRef persistentActor =null; - + private void recover(ActorRef persistentActor) { //#recover-explicit - persistentActor.tell(Recover.create(), null); + persistentActor.tell(Recover.create(), ActorRef.noSender()); //#recover-explicit } @@ -425,8 +424,118 @@ public class LambdaPersistenceDocTest { } }; - static Object o11 = new Object() { + + class MyPersistentActor extends AbstractPersistentActor { + @Override + public String persistenceId() { + return "my-stable-persistence-id"; + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder.matchAny(event -> {}).build(); + } + + //#nested-persist-persist + @Override public PartialFunction receiveRecover() { + final Procedure replyToSender = event -> sender().tell(event, self()); + + return ReceiveBuilder + .match(String.class, msg -> { + persist(String.format("%s-outer-1", msg), event -> { + sender().tell(event, self()); + persist(String.format("%s-inner-1", event), replyToSender); + }); + + persist(String.format("%s-outer-2", msg), event -> { + sender().tell(event, self()); + persist(String.format("%s-inner-2", event), replyToSender); + }); + }) + .build(); + } + //#nested-persist-persist + + void usage(ActorRef persistentActor) { + //#nested-persist-persist-caller + persistentActor.tell("a", ActorRef.noSender()); + persistentActor.tell("b", ActorRef.noSender()); + + // order of received messages: + // a + // a-outer-1 + // a-outer-2 + // a-inner-1 + // a-inner-2 + // and only then process "b" + // b + // b-outer-1 + // b-outer-2 + // b-inner-1 + // b-inner-2 + + //#nested-persist-persist-caller + } + } + + + class MyPersistAsyncActor extends AbstractPersistentActor { + @Override + public String persistenceId() { + return "my-stable-persistence-id"; + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder.matchAny(event -> {}).build(); + } + + //#nested-persistAsync-persistAsync + @Override public PartialFunction receiveRecover() { + final Procedure replyToSender = event -> sender().tell(event, self()); + + return ReceiveBuilder + .match(String.class, msg -> { + persistAsync(String.format("%s-outer-1", msg ), event -> { + sender().tell(event, self()); + persistAsync(String.format("%s-inner-1", event), replyToSender); + }); + + persistAsync(String.format("%s-outer-2", msg ), event -> { + sender().tell(event, self()); + persistAsync(String.format("%s-inner-1", event), replyToSender); + }); + }) + .build(); + } + //#nested-persistAsync-persistAsync + + void usage(ActorRef persistentActor) { + //#nested-persistAsync-persistAsync-caller + persistentActor.tell("a", self()); + persistentActor.tell("b", self()); + + // order of received messages: + // a + // b + // a-outer-1 + // a-outer-2 + // b-outer-1 + // b-outer-2 + // a-inner-1 + // a-inner-2 + // b-inner-1 + // b-inner-2 + + // which can be seen as the following causal relationship: + // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2 + // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2 + + //#nested-persistAsync-persistAsync-caller + } + } + }; + + static Object o12 = new Object() { //#view class MyView extends AbstractPersistentView { @Override public String persistenceId() { return "some-persistence-id"; }