Merge pull request #19829 from MQ-EL/master
=doc #19551 Supplement java8-lambda docs of persistence stash
This commit is contained in:
commit
a07b9ca7aa
6 changed files with 67 additions and 86 deletions
|
|
@ -156,9 +156,10 @@ private[akka] trait StashSupport {
|
||||||
def stash(): Unit = {
|
def stash(): Unit = {
|
||||||
val currMsg = actorCell.currentMessage
|
val currMsg = actorCell.currentMessage
|
||||||
if (theStash.nonEmpty && (currMsg eq theStash.last))
|
if (theStash.nonEmpty && (currMsg eq theStash.last))
|
||||||
throw new IllegalStateException("Can't stash the same message " + currMsg + " more than once")
|
throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
|
||||||
if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg
|
if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg
|
||||||
else throw new StashOverflowException("Couldn't enqueue message " + currMsg.getClass.getName + " to stash of " + self)
|
else throw new StashOverflowException(
|
||||||
|
s"Couldn't enqueue message ${currMsg.message.getClass.getName} from ${currMsg.sender} to stash of $self")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -789,7 +789,7 @@ behavior is not the default).
|
||||||
|
|
||||||
.. includecode:: code/docs/actorlambda/ActorDocTest.java#swapper
|
.. includecode:: code/docs/actorlambda/ActorDocTest.java#swapper
|
||||||
|
|
||||||
.. _stash-lambda-java:
|
.. _stash-lambda:
|
||||||
|
|
||||||
Stash
|
Stash
|
||||||
=====
|
=====
|
||||||
|
|
|
||||||
|
|
@ -117,20 +117,8 @@ about successful state changes by publishing events.
|
||||||
|
|
||||||
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
||||||
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
||||||
calls in context of a single command. Incoming messages are :ref:`stashed <stash-lambda-java>` until the ``persist``
|
calls in context of a single command. Incoming messages are :ref:`stashed <internal-stash-lambda>` until the ``persist``
|
||||||
is completed. You should be careful to not send more messages to a persistent actor than it can keep up with,
|
is completed.
|
||||||
otherwise the number of stashed messages will grow. It can be wise to protect against `OutOfMemoryError`
|
|
||||||
by defining a maximum stash capacity in the mailbox configuration::
|
|
||||||
|
|
||||||
akka.actor.default-mailbox.stash-capacity=10000
|
|
||||||
|
|
||||||
If the stash capacity is exceeded for an actor the stashed messages are discarded and a
|
|
||||||
``MessageQueueAppendFailedException`` is thrown, causing actor restart if default supervision
|
|
||||||
strategy is used.
|
|
||||||
|
|
||||||
Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding,
|
|
||||||
you may need to define a small stash capacity to ensure that the total number of stashed messages in the system
|
|
||||||
don't consume too much memory.
|
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
||||||
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
||||||
|
|
@ -202,6 +190,47 @@ and before any other received messages.
|
||||||
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
||||||
is called (logging the error by default), and the actor will be stopped.
|
is called (logging the error by default), and the actor will be stopped.
|
||||||
|
|
||||||
|
.. _internal-stash-lambda:
|
||||||
|
|
||||||
|
Internal stash
|
||||||
|
--------------
|
||||||
|
|
||||||
|
The persistent actor has a private :ref:`stash <stash-lambda>` for internally caching incoming messages during
|
||||||
|
:ref:`recovery <recovery-java-lambda>` or the ``persist\persistAll`` method persisting events. You can still
|
||||||
|
use/inherit from the ``Stash`` interface. The internal stash cooperates with the normal stash by hooking into
|
||||||
|
``unstashAll`` method and making sure messages are unstashed properly to the internal stash to maintain ordering
|
||||||
|
guarantees.
|
||||||
|
|
||||||
|
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
||||||
|
of stashed messages will grow without bounds. It can be wise to protect against ``OutOfMemoryError`` by defining a
|
||||||
|
maximum stash capacity in the mailbox configuration::
|
||||||
|
|
||||||
|
akka.actor.default-mailbox.stash-capacity=10000
|
||||||
|
|
||||||
|
Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding,
|
||||||
|
you may need to define a small stash capacity to ensure that the total number of stashed messages in the system
|
||||||
|
don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the
|
||||||
|
internal stash capacity is exceeded. The default overflow strategy is the ``ThrowOverflowExceptionStrategy``, which
|
||||||
|
discards the current received message and throws a ``StashOverflowException``, causing actor restart if default
|
||||||
|
supervision strategy is used. you can override the ``internalStashOverflowStrategy`` method to return
|
||||||
|
``DiscardToDeadLetterStrategy`` or ``ReplyToStrategy`` for any "individual" persistent actor, or define the "default"
|
||||||
|
for all persistent actors by providing FQCN, which must be a subclass of ``StashOverflowStrategyConfigurator``, in the
|
||||||
|
persistence configuration::
|
||||||
|
|
||||||
|
akka.persistence.internal-stash-overflow-strategy=
|
||||||
|
"akka.persistence.ThrowExceptionConfigurator"
|
||||||
|
|
||||||
|
The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion configurator
|
||||||
|
``akka.persistence.DiscardConfigurator``.
|
||||||
|
|
||||||
|
You can also query default strategy via the Akka persistence extension singleton::
|
||||||
|
|
||||||
|
Persistence.get(context().system()).defaultInternalStashOverflowStrategy();
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
The bounded mailbox should be avoided in the persistent actor, by which the messages come from storage backends may
|
||||||
|
be discarded. You can use bounded stash instead of it.
|
||||||
|
|
||||||
|
|
||||||
Relaxed local consistency requirements and high throughput use-cases
|
Relaxed local consistency requirements and high throughput use-cases
|
||||||
--------------------------------------------------------------------
|
--------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ about successful state changes by publishing events.
|
||||||
|
|
||||||
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
||||||
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
||||||
calls in context of a single command. Incoming messages are :ref:`stashed <internal-stash_java>` until the ``persist``
|
calls in context of a single command. Incoming messages are :ref:`stashed <internal-stash-java>` until the ``persist``
|
||||||
is completed.
|
is completed.
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
||||||
|
|
@ -192,21 +192,19 @@ and before any other received messages.
|
||||||
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
||||||
is called (logging the error by default) and the actor will be stopped.
|
is called (logging the error by default) and the actor will be stopped.
|
||||||
|
|
||||||
.. _internal-stash_java:
|
.. _internal-stash-java:
|
||||||
|
|
||||||
Internal stash
|
Internal stash
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
The persistent actor has a private :ref:`stash <stash-scala>` for internally caching incoming messages during
|
The persistent actor has a private :ref:`stash <stash-java>` for internally caching incoming messages during
|
||||||
:ref:`recovery` or the ``persist\persistAll`` method persisting events. However You can use inherited stash or create
|
:ref:`recovery <recovery-java>` or the ``persist\persistAll`` method persisting events. You can still use/inherit
|
||||||
one or more stashes if needed. The internal stash doesn't interfere with these stashes apart from user inherited
|
from the ``Stash`` interface. The internal stash cooperates with the normal stash by hooking into ``unstashAll``
|
||||||
``unstashAll`` method, which prepends all messages in the inherited stash to the internal stash instead of mailbox.
|
method and making sure messages are unstashed properly to the internal stash to maintain ordering guarantees.
|
||||||
Hence, If the message in the inherited stash need to be handled after the messages in the internal stash, you should
|
|
||||||
call inherited ``unstash`` method.
|
|
||||||
|
|
||||||
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
||||||
of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` by defining a maximum stash
|
of stashed messages will grow without bounds. It can be wise to protect against ``OutOfMemoryError`` by defining a
|
||||||
capacity in the mailbox configuration::
|
maximum stash capacity in the mailbox configuration::
|
||||||
|
|
||||||
akka.actor.default-mailbox.stash-capacity=10000
|
akka.actor.default-mailbox.stash-capacity=10000
|
||||||
|
|
||||||
|
|
@ -228,11 +226,11 @@ The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion c
|
||||||
|
|
||||||
You can also query default strategy via the Akka persistence extension singleton::
|
You can also query default strategy via the Akka persistence extension singleton::
|
||||||
|
|
||||||
Persistence.get(context().system()).defaultInternalStashOverflowStrategy()
|
Persistence.get(context().system()).defaultInternalStashOverflowStrategy();
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The bounded mailbox should be avoid in the persistent actor, because it may be discarding the messages come from
|
The bounded mailbox should be avoided in the persistent actor, by which the messages come from storage backends may
|
||||||
Storage backends. You can use bounded stash instead of bounded mailbox.
|
be discarded. You can use bounded stash instead of it.
|
||||||
|
|
||||||
|
|
||||||
.. _persist-async-java:
|
.. _persist-async-java:
|
||||||
|
|
|
||||||
|
|
@ -414,51 +414,6 @@ object PersistenceDocSpec {
|
||||||
// Shutdown
|
// Shutdown
|
||||||
// -- stop --
|
// -- stop --
|
||||||
//#safe-shutdown-example-good
|
//#safe-shutdown-example-good
|
||||||
|
|
||||||
class MyPersistAsyncActor extends PersistentActor {
|
|
||||||
override def persistenceId = "my-stable-persistence-id"
|
|
||||||
|
|
||||||
override def receiveRecover: Receive = {
|
|
||||||
case _ => // handle recovery here
|
|
||||||
}
|
|
||||||
|
|
||||||
//#nested-persistAsync-persistAsync
|
|
||||||
override def receiveCommand: Receive = {
|
|
||||||
case c: String =>
|
|
||||||
sender() ! c
|
|
||||||
persistAsync(c + "-outer-1") { outer ⇒
|
|
||||||
sender() ! outer
|
|
||||||
persistAsync(c + "-inner-1") { inner ⇒ sender() ! inner }
|
|
||||||
}
|
|
||||||
persistAsync(c + "-outer-2") { outer ⇒
|
|
||||||
sender() ! outer
|
|
||||||
persistAsync(c + "-inner-2") { inner ⇒ sender() ! inner }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#nested-persistAsync-persistAsync
|
|
||||||
}
|
|
||||||
|
|
||||||
//#nested-persistAsync-persistAsync-caller
|
|
||||||
persistentActor ! "a"
|
|
||||||
persistentActor ! "b"
|
|
||||||
|
|
||||||
// order of received messages:
|
|
||||||
// a
|
|
||||||
// b
|
|
||||||
// a-outer-1
|
|
||||||
// a-outer-2
|
|
||||||
// b-outer-1
|
|
||||||
// b-outer-2
|
|
||||||
// a-inner-1
|
|
||||||
// a-inner-2
|
|
||||||
// b-inner-1
|
|
||||||
// b-inner-2
|
|
||||||
|
|
||||||
// which can be seen as the following causal relationship:
|
|
||||||
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
|
|
||||||
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
|
|
||||||
|
|
||||||
//#nested-persistAsync-persistAsync-caller
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object View {
|
object View {
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,7 @@ about successful state changes by publishing events.
|
||||||
|
|
||||||
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between
|
||||||
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
|
||||||
calls in context of a single command. Incoming messages are :ref:`stashed <internal-stash_scala>` until the ``persist``
|
calls in context of a single command. Incoming messages are :ref:`stashed <internal-stash-scala>` until the ``persist``
|
||||||
is completed.
|
is completed.
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default),
|
||||||
|
|
@ -175,21 +175,19 @@ and before any other received messages.
|
||||||
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
||||||
is called (logging the error by default) and the actor will be stopped.
|
is called (logging the error by default) and the actor will be stopped.
|
||||||
|
|
||||||
.. _internal-stash_scala:
|
.. _internal-stash-scala:
|
||||||
|
|
||||||
Internal stash
|
Internal stash
|
||||||
--------------
|
--------------
|
||||||
|
|
||||||
The persistent actor has a private :ref:`stash <stash-scala>` for internally caching incoming messages during
|
The persistent actor has a private :ref:`stash <stash-scala>` for internally caching incoming messages during
|
||||||
:ref:`recovery` or the ``persist\persistAll`` method persisting events. However You can use inherited stash or create
|
:ref:`recovery <recovery>` or the ``persist\persistAll`` method persisting events. You can still use/inherit from the
|
||||||
one or more stashes if needed. The internal stash doesn't interfere with these stashes apart from user inherited
|
``Stash`` interface. The internal stash cooperates with the normal stash by hooking into ``unstashAll`` method and
|
||||||
``unstashAll`` method, which prepends all messages in the inherited stash to the internal stash instead of mailbox.
|
making sure messages are unstashed properly to the internal stash to maintain ordering guarantees.
|
||||||
Hence, If the message in the inherited stash need to be handled after the messages in the internal stash, you should
|
|
||||||
call inherited ``unstash`` method.
|
|
||||||
|
|
||||||
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
||||||
of stashed messages will grow. It can be wise to protect against `OutOfMemoryError` by defining a maximum stash
|
of stashed messages will grow without bounds. It can be wise to protect against ``OutOfMemoryError`` by defining a
|
||||||
capacity in the mailbox configuration::
|
maximum stash capacity in the mailbox configuration::
|
||||||
|
|
||||||
akka.actor.default-mailbox.stash-capacity=10000
|
akka.actor.default-mailbox.stash-capacity=10000
|
||||||
|
|
||||||
|
|
@ -214,8 +212,8 @@ You can also query default strategy via the Akka persistence extension singleton
|
||||||
Persistence(context.system).defaultInternalStashOverflowStrategy
|
Persistence(context.system).defaultInternalStashOverflowStrategy
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The bounded mailbox should be avoid in the persistent actor, because it may be discarding the messages come from
|
The bounded mailbox should be avoided in the persistent actor, by which the messages come from storage backends may
|
||||||
Storage backends. You can use bounded stash instead of bounded mailbox.
|
be discarded. You can use bounded stash instead of it.
|
||||||
|
|
||||||
.. _persist-async-scala:
|
.. _persist-async-scala:
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue