=per #15640 support nested persist/persistAsync
This commit is contained in:
parent
17760c020c
commit
a59c9f73b6
10 changed files with 752 additions and 46 deletions
|
|
@ -3,21 +3,21 @@
|
|||
*/
|
||||
package akka.event
|
||||
|
||||
import language.existentials
|
||||
import akka.actor._
|
||||
import akka.{ ConfigurationException, AkkaException }
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import akka.util.ReentrantGuard
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import akka.actor._
|
||||
import akka.dispatch.RequiresMessageQueue
|
||||
import akka.util.ReentrantGuard
|
||||
import akka.{ AkkaException, ConfigurationException }
|
||||
|
||||
import scala.annotation.implicitNotFound
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Await
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.util.control.NonFatal
|
||||
import java.util.Locale
|
||||
import akka.dispatch.RequiresMessageQueue
|
||||
import scala.language.existentials
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
|
||||
/**
|
||||
* This trait brings log level handling to the EventStream: it reads the log
|
||||
|
|
@ -1132,10 +1132,12 @@ class DefaultLoggingFilter(logLevel: () ⇒ Logging.LogLevel) extends LoggingFil
|
|||
*/
|
||||
trait DiagnosticLoggingAdapter extends LoggingAdapter {
|
||||
|
||||
import Logging._
|
||||
import scala.collection.JavaConverters._
|
||||
import java.{ util ⇒ ju }
|
||||
|
||||
import Logging._
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
private var _mdc = emptyMDC
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ public class PersistenceDocTest {
|
|||
|
||||
private void recover() {
|
||||
//#recover-explicit
|
||||
persistentActor.tell(Recover.create(), getSelf());
|
||||
persistentActor.tell(Recover.create(), self());
|
||||
//#recover-explicit
|
||||
}
|
||||
}
|
||||
|
|
@ -64,7 +64,7 @@ public class PersistenceDocTest {
|
|||
//#recover-on-start-custom
|
||||
@Override
|
||||
public void preStart() {
|
||||
getSelf().tell(Recover.create(457L), getSelf());
|
||||
self().tell(Recover.create(457L), self());
|
||||
}
|
||||
//#recover-on-start-custom
|
||||
}
|
||||
|
|
@ -129,7 +129,7 @@ public class PersistenceDocTest {
|
|||
abstract class MyPersistentActor1 extends UntypedPersistentActor {
|
||||
//#recover-fully-disabled
|
||||
@Override
|
||||
public void preStart() { getSelf().tell(Recover.create(0L), getSelf()); }
|
||||
public void preStart() { self().tell(Recover.create(0L), self()); }
|
||||
//#recover-fully-disabled
|
||||
}
|
||||
};
|
||||
|
|
@ -227,7 +227,7 @@ public class PersistenceDocTest {
|
|||
if (message instanceof Msg) {
|
||||
Msg msg = (Msg) message;
|
||||
// ...
|
||||
getSender().tell(new Confirm(msg.deliveryId), getSelf());
|
||||
getSender().tell(new Confirm(msg.deliveryId), self());
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
|
|
@ -324,7 +324,7 @@ public class PersistenceDocTest {
|
|||
|
||||
@Override
|
||||
public void onReceiveCommand(Object msg) {
|
||||
sender().tell(msg, getSelf());
|
||||
sender().tell(msg, self());
|
||||
|
||||
persistAsync(String.format("evt-%s-1", msg), new Procedure<String>(){
|
||||
@Override
|
||||
|
|
@ -376,7 +376,7 @@ public class PersistenceDocTest {
|
|||
final Procedure<String> replyToSender = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, getSelf());
|
||||
sender().tell(event, self());
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -408,6 +408,140 @@ public class PersistenceDocTest {
|
|||
};
|
||||
|
||||
static Object o11 = new Object() {
|
||||
|
||||
class MyPersistentActor extends UntypedPersistentActor {
|
||||
@Override
|
||||
public String persistenceId() {
|
||||
return "my-stable-persistence-id";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveRecover(Object msg) {
|
||||
// handle recovery here
|
||||
}
|
||||
|
||||
//#nested-persist-persist
|
||||
@Override
|
||||
public void onReceiveCommand(Object msg) {
|
||||
final Procedure<String> replyToSender = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
}
|
||||
};
|
||||
|
||||
final Procedure<String> outer1Callback = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
persist(String.format("%s-inner-1", msg), replyToSender);
|
||||
}
|
||||
};
|
||||
final Procedure<String> outer2Callback = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
persist(String.format("%s-inner-2", msg), replyToSender);
|
||||
}
|
||||
};
|
||||
|
||||
persist(String.format("%s-outer-1", msg), outer1Callback);
|
||||
persist(String.format("%s-outer-2", msg), outer2Callback);
|
||||
}
|
||||
//#nested-persist-persist
|
||||
|
||||
void usage(ActorRef persistentActor) {
|
||||
//#nested-persist-persist-caller
|
||||
persistentActor.tell("a", self());
|
||||
persistentActor.tell("b", self());
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// and only then process "b"
|
||||
// b
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
//#nested-persist-persist-caller
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MyPersistAsyncActor extends UntypedPersistentActor {
|
||||
@Override
|
||||
public String persistenceId() {
|
||||
return "my-stable-persistence-id";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveRecover(Object msg) {
|
||||
// handle recovery here
|
||||
}
|
||||
|
||||
//#nested-persistAsync-persistAsync
|
||||
@Override
|
||||
public void onReceiveCommand(Object msg) {
|
||||
final Procedure<String> replyToSender = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
}
|
||||
};
|
||||
|
||||
final Procedure<String> outer1Callback = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
persistAsync(String.format("%s-inner-1", msg), replyToSender);
|
||||
}
|
||||
};
|
||||
final Procedure<String> outer2Callback = new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String event) throws Exception {
|
||||
sender().tell(event, self());
|
||||
persistAsync(String.format("%s-inner-1", msg), replyToSender);
|
||||
}
|
||||
};
|
||||
|
||||
persistAsync(String.format("%s-outer-1", msg), outer1Callback);
|
||||
persistAsync(String.format("%s-outer-2", msg), outer2Callback);
|
||||
}
|
||||
//#nested-persistAsync-persistAsync
|
||||
|
||||
|
||||
void usage(ActorRef persistentActor) {
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
persistentActor.tell("a", ActorRef.noSender());
|
||||
persistentActor.tell("b", ActorRef.noSender());
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// b
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
// which can be seen as the following causal relationship:
|
||||
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
|
||||
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
|
||||
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static Object o12 = new Object() {
|
||||
//#view
|
||||
class MyView extends UntypedPersistentView {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -249,7 +249,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
|||
.. _defer-java-lambda:
|
||||
|
||||
Deferring actions until preceding persist handlers have executed
|
||||
-----------------------------------------------------------------
|
||||
----------------------------------------------------------------
|
||||
|
||||
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
||||
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
|
||||
|
|
@ -270,6 +270,41 @@ of the command for which this ``defer`` handler was called.
|
|||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||
|
||||
.. _nested-persist-calls-lambda:
|
||||
|
||||
Nested persist calls
|
||||
--------------------
|
||||
It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly
|
||||
retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees.
|
||||
|
||||
In general it is encouraged to create command handlers which do not need to resort to nested event persisting,
|
||||
however there are situations where it may be useful. It is important to understand the ordering of callback execution in
|
||||
those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following
|
||||
example two persist calls are issued, and each of them issues another persist inside its callback:
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist
|
||||
|
||||
When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order:
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persist-persist-caller
|
||||
|
||||
First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed
|
||||
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
|
||||
And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor.
|
||||
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
|
||||
is extended until all nested ``persist`` callbacks have been handled.
|
||||
|
||||
It is also possible to nest ``persistAsync`` calls, using the same pattern:
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync
|
||||
|
||||
In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order:
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#nested-persistAsync-persistAsync-caller
|
||||
|
||||
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
|
||||
it is not a recommended practice as it may lead to overly complex nesting.
|
||||
|
||||
Failures
|
||||
--------
|
||||
|
||||
|
|
|
|||
|
|
@ -273,6 +273,41 @@ of the command for which this ``defer`` handler was called.
|
|||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||
|
||||
.. _nested-persist-calls-java:
|
||||
|
||||
Nested persist calls
|
||||
--------------------
|
||||
It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly
|
||||
retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees.
|
||||
|
||||
In general it is encouraged to create command handlers which do not need to resort to nested event persisting,
|
||||
however there are situations where it may be useful. It is important to understand the ordering of callback execution in
|
||||
those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following
|
||||
example two persist calls are issued, and each of them issues another persist inside its callback:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist
|
||||
|
||||
When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist-caller
|
||||
|
||||
First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed
|
||||
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
|
||||
And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor.
|
||||
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
|
||||
is extended until all nested ``persist`` callbacks have been handled.
|
||||
|
||||
It is also possible to nest ``persistAsync`` calls, using the same pattern:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persistAsync-persistAsync
|
||||
|
||||
In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persistAsync-persistAsync-caller
|
||||
|
||||
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
|
||||
it is not a recommended practice as it may lead to overly complex nesting.
|
||||
|
||||
Failures
|
||||
--------
|
||||
|
||||
|
|
|
|||
|
|
@ -280,6 +280,103 @@ object PersistenceDocSpec {
|
|||
//#defer-caller
|
||||
}
|
||||
|
||||
object NestedPersists {
|
||||
|
||||
class MyPersistentActor extends PersistentActor {
|
||||
override def persistenceId = "my-stable-persistence-id"
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case _ => // handle recovery here
|
||||
}
|
||||
|
||||
//#nested-persist-persist
|
||||
override def receiveCommand: Receive = {
|
||||
case c: String =>
|
||||
sender() ! c
|
||||
|
||||
persist(s"$c-1-outer") { outer1 =>
|
||||
sender() ! outer1
|
||||
persist(s"$c-1-inner") { inner1 =>
|
||||
sender() ! inner1
|
||||
}
|
||||
}
|
||||
|
||||
persist(s"$c-2-outer") { outer2 =>
|
||||
sender() ! outer2
|
||||
persist(s"$c-2-inner") { inner2 =>
|
||||
sender() ! inner2
|
||||
}
|
||||
}
|
||||
}
|
||||
//#nested-persist-persist
|
||||
}
|
||||
|
||||
//#nested-persist-persist-caller
|
||||
persistentActor ! "a"
|
||||
persistentActor ! "b"
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// and only then process "b"
|
||||
// b
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
//#nested-persist-persist-caller
|
||||
|
||||
|
||||
class MyPersistAsyncActor extends PersistentActor {
|
||||
override def persistenceId = "my-stable-persistence-id"
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case _ => // handle recovery here
|
||||
}
|
||||
|
||||
//#nested-persistAsync-persistAsync
|
||||
override def receiveCommand: Receive = {
|
||||
case c: String =>
|
||||
sender() ! c
|
||||
persistAsync(c + "-outer-1") { outer ⇒
|
||||
sender() ! outer
|
||||
persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner }
|
||||
}
|
||||
persistAsync(c + "-outer-2") { outer ⇒
|
||||
sender() ! outer
|
||||
persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner }
|
||||
}
|
||||
}
|
||||
//#nested-persistAsync-persistAsync
|
||||
}
|
||||
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
persistentActor ! "a"
|
||||
persistentActor ! "b"
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// b
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
// which can be seen as the following causal relationship:
|
||||
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
|
||||
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
|
||||
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
}
|
||||
|
||||
object View {
|
||||
import akka.actor.Props
|
||||
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
|||
.. _defer-scala:
|
||||
|
||||
Deferring actions until preceding persist handlers have executed
|
||||
-----------------------------------------------------------------
|
||||
----------------------------------------------------------------
|
||||
|
||||
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
||||
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
|
||||
|
|
@ -259,6 +259,41 @@ The calling side will get the responses in this (guaranteed) order:
|
|||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||
|
||||
.. _nested-persist-calls-scala:
|
||||
|
||||
Nested persist calls
|
||||
--------------------
|
||||
It is possible to call ``persist`` and ``persistAsync`` inside their respective callback blocks and they will properly
|
||||
retain both the thread safety (including the right value of ``sender()``) as well as stashing guarantees.
|
||||
|
||||
In general it is encouraged to create command handlers which do not need to resort to nested event persisting,
|
||||
however there are situations where it may be useful. It is important to understand the ordering of callback execution in
|
||||
those situations, as well as their implication on the stashing behaviour (that ``persist()`` enforces). In the following
|
||||
example two persist calls are issued, and each of them issues another persist inside its callback:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist
|
||||
|
||||
When sending two commands to this ``PersistentActor``, the persist handlers will be executed in the following order:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist-caller
|
||||
|
||||
First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed
|
||||
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
|
||||
And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor.
|
||||
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
|
||||
is extended until all nested ``persist`` callbacks have been handled.
|
||||
|
||||
It is also possible to nest ``persistAsync`` calls, using the same pattern:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persistAsync-persistAsync
|
||||
|
||||
In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persistAsync-persistAsync-caller
|
||||
|
||||
While it is possible to nest mixed ``persist`` and ``persistAsync`` with keeping their respective semantics
|
||||
it is not a recommended practice as it may lead to overly complex nesting.
|
||||
|
||||
Failures
|
||||
--------
|
||||
|
||||
|
|
|
|||
|
|
@ -522,6 +522,44 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
|||
}
|
||||
}
|
||||
|
||||
private def flushBatch() {
|
||||
def addToBatch(p: PersistentEnvelope): Unit = p match {
|
||||
case a: AtomicWrite ⇒
|
||||
journalBatch :+= a.copy(payload =
|
||||
a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid)))
|
||||
case r: PersistentEnvelope ⇒
|
||||
journalBatch :+= r
|
||||
}
|
||||
|
||||
def maxBatchSizeReached: Boolean =
|
||||
journalBatch.size >= maxMessageBatchSize
|
||||
|
||||
// When using only `persistAsync` and `defer` max throughput is increased by using
|
||||
// batching, but when using `persist` we want to use one atomic WriteMessages
|
||||
// for the emitted events.
|
||||
// Flush previously collected events, if any, separately from the `persist` batch
|
||||
if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty)
|
||||
flushJournalBatch()
|
||||
|
||||
eventBatch.reverse.foreach { p ⇒
|
||||
addToBatch(p)
|
||||
if (!writeInProgress || maxBatchSizeReached) flushJournalBatch()
|
||||
}
|
||||
|
||||
eventBatch = Nil
|
||||
}
|
||||
|
||||
private def peekApplyHandler(payload: Any): Unit = {
|
||||
val batchSizeBeforeApply = eventBatch.size
|
||||
try pendingInvocations.peek().handler(payload)
|
||||
finally {
|
||||
val batchSizeAfterApply = eventBatch.size
|
||||
|
||||
if (batchSizeAfterApply > batchSizeBeforeApply)
|
||||
flushBatch()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Common receive handler for processingCommands and persistingEvents
|
||||
*/
|
||||
|
|
@ -533,7 +571,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
|||
if (id == instanceId) {
|
||||
updateLastSequenceNr(p)
|
||||
try {
|
||||
pendingInvocations.peek().handler(p.payload)
|
||||
peekApplyHandler(p.payload)
|
||||
onWriteMessageComplete(err = false)
|
||||
} catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e }
|
||||
}
|
||||
|
|
@ -598,22 +636,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
|||
internalStash.unstash()
|
||||
}
|
||||
|
||||
private def flushBatch() {
|
||||
// When using only `persistAsync` and `defer` max throughput is increased by using
|
||||
// batching, but when using `persist` we want to use one atomic WriteMessages
|
||||
// for the emitted events.
|
||||
// Flush previously collected events, if any, separately from the `persist` batch
|
||||
if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty)
|
||||
flushJournalBatch()
|
||||
|
||||
eventBatch.reverse.foreach { p ⇒
|
||||
addToBatch(p)
|
||||
if (!writeInProgress || maxBatchSizeReached) flushJournalBatch()
|
||||
}
|
||||
|
||||
eventBatch = Nil
|
||||
}
|
||||
|
||||
private def addToBatch(p: PersistentEnvelope): Unit = p match {
|
||||
case a: AtomicWrite ⇒
|
||||
journalBatch :+= a.copy(payload =
|
||||
|
|
|
|||
|
|
@ -7,6 +7,9 @@ package akka.persistence
|
|||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import org.scalatest.matchers.{ MatchResult, Matcher }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
|
|
@ -18,7 +21,8 @@ import org.scalatest.BeforeAndAfterEach
|
|||
import akka.actor.Props
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒
|
||||
abstract class PersistenceSpec(config: Config) extends AkkaSpec(config) with BeforeAndAfterEach with Cleanup
|
||||
with PersistenceMatchers { this: AkkaSpec ⇒
|
||||
private var _name: String = _
|
||||
|
||||
lazy val extension = Persistence(system)
|
||||
|
|
@ -87,3 +91,28 @@ trait TurnOffRecoverOnStart { this: Eventsourced ⇒
|
|||
class TestException(msg: String) extends Exception(msg) with NoStackTrace
|
||||
|
||||
case object GetState
|
||||
|
||||
/** Additional ScalaTest matchers useful in persistence tests */
|
||||
trait PersistenceMatchers {
|
||||
/** Use this matcher to verify in-order execution of independent "streams" of events */
|
||||
final class IndependentlyOrdered(prefixes: immutable.Seq[String]) extends Matcher[immutable.Seq[Any]] {
|
||||
override def apply(_left: immutable.Seq[Any]) = {
|
||||
val left = _left.map(_.toString)
|
||||
val mapped = left.groupBy(l ⇒ prefixes.indexWhere(p ⇒ l.startsWith(p))) - (-1) // ignore other messages
|
||||
val results = for {
|
||||
(pos, seq) ← mapped
|
||||
nrs = seq.map(_.replaceFirst(prefixes(pos), "").toInt)
|
||||
sortedNrs = nrs.sorted
|
||||
if nrs != sortedNrs
|
||||
} yield MatchResult(
|
||||
false,
|
||||
s"""Messages sequence with prefix ${prefixes(pos)} was not sorted! Was: $seq"""",
|
||||
s"""Messages sequence with prefix ${prefixes(pos)} was sorted! Was: $seq"""")
|
||||
|
||||
if (results.forall(_.matches)) MatchResult(true, "", "")
|
||||
else results.find(r ⇒ !r.matches).get
|
||||
}
|
||||
}
|
||||
|
||||
def beIndependentlyOrdered(prefixes: String*) = new IndependentlyOrdered(prefixes.toList)
|
||||
}
|
||||
|
|
@ -9,11 +9,13 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import akka.actor._
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe }
|
||||
import com.typesafe.config.Config
|
||||
import org.scalatest.matchers.{ MatchResult, Matcher }
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.Await
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Await }
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
import scala.util.{ Try, Random }
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
object PersistentActorSpec {
|
||||
|
|
@ -424,6 +426,130 @@ object PersistentActorSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class MultipleAndNestedPersists(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persist(s + "-outer-1") { outer ⇒
|
||||
probe ! outer
|
||||
persist(s + "-inner-1") { inner ⇒ probe ! inner }
|
||||
}
|
||||
persist(s + "-outer-2") { outer ⇒
|
||||
probe ! outer
|
||||
persist(s + "-inner-2") { inner ⇒ probe ! inner }
|
||||
}
|
||||
}
|
||||
}
|
||||
class MultipleAndNestedPersistAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persistAsync(s + "-outer-1") { outer ⇒
|
||||
probe ! outer
|
||||
persistAsync(s + "-inner-1") { inner ⇒ probe ! inner }
|
||||
}
|
||||
persistAsync(s + "-outer-2") { outer ⇒
|
||||
probe ! outer
|
||||
persistAsync(s + "-inner-2") { inner ⇒ probe ! inner }
|
||||
}
|
||||
}
|
||||
}
|
||||
class DeeplyNestedPersistAsyncs(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
var currentDepths = Map.empty[String, Int].withDefaultValue(1)
|
||||
|
||||
def weMustGoDeeper: String ⇒ Unit = { dWithDepth ⇒
|
||||
val d = dWithDepth.split("-").head
|
||||
probe ! dWithDepth
|
||||
if (currentDepths(d) < maxDepth) {
|
||||
currentDepths = currentDepths.updated(d, currentDepths(d) + 1)
|
||||
persistAsync(d + "-" + currentDepths(d))(weMustGoDeeper)
|
||||
} else {
|
||||
// reset depth counter before next command
|
||||
currentDepths = currentDepths.updated(d, 1)
|
||||
}
|
||||
}
|
||||
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persistAsync(s + "-" + 1)(weMustGoDeeper)
|
||||
}
|
||||
}
|
||||
|
||||
class NestedPersistNormalAndAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persist(s + "-outer-1") { outer ⇒
|
||||
probe ! outer
|
||||
persistAsync(s + "-inner-async-1") { inner ⇒
|
||||
probe ! inner
|
||||
}
|
||||
}
|
||||
persist(s + "-outer-2") { outer ⇒
|
||||
probe ! outer
|
||||
persistAsync(s + "-inner-async-2") { inner ⇒
|
||||
probe ! inner
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
class NestedPersistAsyncsAndNormal(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persistAsync(s + "-outer-async-1") { outer ⇒
|
||||
probe ! outer
|
||||
persist(s + "-inner-1") { inner ⇒
|
||||
probe ! inner
|
||||
}
|
||||
}
|
||||
persistAsync(s + "-outer-async-2") { outer ⇒
|
||||
probe ! outer
|
||||
persist(s + "-inner-2") { inner ⇒
|
||||
probe ! inner
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
class NestedPersistInAsyncEnforcesStashing(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persistAsync(s + "-outer-async") { outer ⇒
|
||||
probe ! outer
|
||||
persist(s + "-inner") { inner ⇒
|
||||
probe ! inner
|
||||
Thread.sleep(1000) // really long wait here...
|
||||
// the next incoming command must be handled by the following function
|
||||
context.become({ case _ ⇒ sender() ! "done" })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DeeplyNestedPersists(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) {
|
||||
var currentDepths = Map.empty[String, Int].withDefaultValue(1)
|
||||
|
||||
def weMustGoDeeper: String ⇒ Unit = { dWithDepth ⇒
|
||||
val d = dWithDepth.split("-").head
|
||||
probe ! dWithDepth
|
||||
if (currentDepths(d) < maxDepth) {
|
||||
currentDepths = currentDepths.updated(d, currentDepths(d) + 1)
|
||||
persist(d + "-" + currentDepths(d))(weMustGoDeeper)
|
||||
} else {
|
||||
// reset depth counter before next command
|
||||
currentDepths = currentDepths.updated(d, 1)
|
||||
}
|
||||
}
|
||||
|
||||
val receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
probe ! s
|
||||
persist(s + "-" + 1)(weMustGoDeeper)
|
||||
}
|
||||
}
|
||||
|
||||
class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor {
|
||||
override def persistenceId: String = "StackableTestPersistentActor"
|
||||
|
||||
|
|
@ -891,6 +1017,88 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
|
|||
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
"allow multiple persists with nested persist calls" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersists], name, testActor))
|
||||
persistentActor ! "a"
|
||||
persistentActor ! "b"
|
||||
|
||||
expectMsg("a")
|
||||
expectMsg("a-outer-1")
|
||||
expectMsg("a-outer-2")
|
||||
expectMsg("a-inner-1")
|
||||
expectMsg("a-inner-2")
|
||||
// and only then process "b"
|
||||
expectMsg("b")
|
||||
expectMsg("b-outer-1")
|
||||
expectMsg("b-outer-2")
|
||||
expectMsg("b-inner-1")
|
||||
expectMsg("b-inner-2")
|
||||
}
|
||||
"allow multiple persistAsyncs with nested persistAsync calls" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncs], name, testActor))
|
||||
persistentActor ! "a"
|
||||
persistentActor ! "b"
|
||||
|
||||
val msgs = receiveN(10).map(_.toString)
|
||||
val as = msgs.filter(_ startsWith "a")
|
||||
val bs = msgs.filter(_ startsWith "b")
|
||||
as should equal(List("a", "a-outer-1", "a-outer-2", "a-inner-1", "a-inner-2"))
|
||||
bs should equal(List("b", "b-outer-1", "b-outer-2", "b-inner-1", "b-inner-2"))
|
||||
}
|
||||
"allow deeply nested persist calls" in {
|
||||
val nestedPersists = 6
|
||||
|
||||
val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersists], name, nestedPersists, testActor))
|
||||
persistentActor ! "a"
|
||||
persistentActor ! "b"
|
||||
|
||||
expectMsg("a")
|
||||
receiveN(6) should ===((1 to nestedPersists).map("a-" + _))
|
||||
// and only then process "b"
|
||||
expectMsg("b")
|
||||
receiveN(6) should ===((1 to nestedPersists).map("b-" + _))
|
||||
}
|
||||
"allow deeply nested persistAsync calls" in {
|
||||
val nestedPersistAsyncs = 6
|
||||
|
||||
val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncs], name, nestedPersistAsyncs, testActor))
|
||||
|
||||
persistentActor ! "a"
|
||||
expectMsg("a")
|
||||
val got = receiveN(nestedPersistAsyncs)
|
||||
got should beIndependentlyOrdered("a-")
|
||||
|
||||
persistentActor ! "b"
|
||||
persistentActor ! "c"
|
||||
val expectedReplies = 2 + (nestedPersistAsyncs * 2)
|
||||
receiveN(expectedReplies).map(_.toString) should beIndependentlyOrdered("b-", "c-")
|
||||
}
|
||||
"allow mixed nesting of persistAsync in persist calls" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncs], name, testActor))
|
||||
persistentActor ! "a"
|
||||
|
||||
expectMsg("a")
|
||||
receiveN(4) should equal(List("a-outer-1", "a-outer-2", "a-inner-async-1", "a-inner-async-2"))
|
||||
}
|
||||
"allow mixed nesting of persist in persistAsync calls" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormal], name, testActor))
|
||||
persistentActor ! "a"
|
||||
|
||||
expectMsg("a")
|
||||
receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2"))
|
||||
}
|
||||
"make sure persist retains promised semantics when nested in persistAsync callback xoxo" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor))
|
||||
persistentActor ! "a"
|
||||
|
||||
expectMsg("a")
|
||||
expectMsg("a-outer-async")
|
||||
expectMsg("a-inner")
|
||||
persistentActor ! "b"
|
||||
expectMsg("done")
|
||||
// which means that b only got applied after the inner persist() handler finished
|
||||
// so it keeps the persist() semantics, even though we should not recommend this style it can come in handy I guess
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.ActorPath;
|
|||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.japi.Procedure;
|
||||
import akka.japi.pf.ReceiveBuilder;
|
||||
import akka.persistence.*;
|
||||
import scala.Option;
|
||||
|
|
@ -34,11 +35,9 @@ public class LambdaPersistenceDocTest {
|
|||
|
||||
static Object o1 = new Object() {
|
||||
|
||||
private void recover() {
|
||||
ActorRef persistentActor =null;
|
||||
|
||||
private void recover(ActorRef persistentActor) {
|
||||
//#recover-explicit
|
||||
persistentActor.tell(Recover.create(), null);
|
||||
persistentActor.tell(Recover.create(), ActorRef.noSender());
|
||||
//#recover-explicit
|
||||
}
|
||||
|
||||
|
|
@ -425,8 +424,118 @@ public class LambdaPersistenceDocTest {
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
static Object o11 = new Object() {
|
||||
|
||||
class MyPersistentActor extends AbstractPersistentActor {
|
||||
@Override
|
||||
public String persistenceId() {
|
||||
return "my-stable-persistence-id";
|
||||
}
|
||||
|
||||
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
|
||||
return ReceiveBuilder.matchAny(event -> {}).build();
|
||||
}
|
||||
|
||||
//#nested-persist-persist
|
||||
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
|
||||
final Procedure<String> replyToSender = event -> sender().tell(event, self());
|
||||
|
||||
return ReceiveBuilder
|
||||
.match(String.class, msg -> {
|
||||
persist(String.format("%s-outer-1", msg), event -> {
|
||||
sender().tell(event, self());
|
||||
persist(String.format("%s-inner-1", event), replyToSender);
|
||||
});
|
||||
|
||||
persist(String.format("%s-outer-2", msg), event -> {
|
||||
sender().tell(event, self());
|
||||
persist(String.format("%s-inner-2", event), replyToSender);
|
||||
});
|
||||
})
|
||||
.build();
|
||||
}
|
||||
//#nested-persist-persist
|
||||
|
||||
void usage(ActorRef persistentActor) {
|
||||
//#nested-persist-persist-caller
|
||||
persistentActor.tell("a", ActorRef.noSender());
|
||||
persistentActor.tell("b", ActorRef.noSender());
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// and only then process "b"
|
||||
// b
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
//#nested-persist-persist-caller
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MyPersistAsyncActor extends AbstractPersistentActor {
|
||||
@Override
|
||||
public String persistenceId() {
|
||||
return "my-stable-persistence-id";
|
||||
}
|
||||
|
||||
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
|
||||
return ReceiveBuilder.matchAny(event -> {}).build();
|
||||
}
|
||||
|
||||
//#nested-persistAsync-persistAsync
|
||||
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
|
||||
final Procedure<String> replyToSender = event -> sender().tell(event, self());
|
||||
|
||||
return ReceiveBuilder
|
||||
.match(String.class, msg -> {
|
||||
persistAsync(String.format("%s-outer-1", msg ), event -> {
|
||||
sender().tell(event, self());
|
||||
persistAsync(String.format("%s-inner-1", event), replyToSender);
|
||||
});
|
||||
|
||||
persistAsync(String.format("%s-outer-2", msg ), event -> {
|
||||
sender().tell(event, self());
|
||||
persistAsync(String.format("%s-inner-1", event), replyToSender);
|
||||
});
|
||||
})
|
||||
.build();
|
||||
}
|
||||
//#nested-persistAsync-persistAsync
|
||||
|
||||
void usage(ActorRef persistentActor) {
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
persistentActor.tell("a", self());
|
||||
persistentActor.tell("b", self());
|
||||
|
||||
// order of received messages:
|
||||
// a
|
||||
// b
|
||||
// a-outer-1
|
||||
// a-outer-2
|
||||
// b-outer-1
|
||||
// b-outer-2
|
||||
// a-inner-1
|
||||
// a-inner-2
|
||||
// b-inner-1
|
||||
// b-inner-2
|
||||
|
||||
// which can be seen as the following causal relationship:
|
||||
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
|
||||
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
|
||||
|
||||
//#nested-persistAsync-persistAsync-caller
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static Object o12 = new Object() {
|
||||
//#view
|
||||
class MyView extends AbstractPersistentView {
|
||||
@Override public String persistenceId() { return "some-persistence-id"; }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue