doc: clarify EventSourcedBehavior effects, #25953

This commit is contained in:
Patrik Nordwall 2019-10-09 14:06:29 +02:00 committed by Johan Andrén
parent a9d1572999
commit 76e2730195
5 changed files with 204 additions and 54 deletions

View file

@ -127,23 +127,19 @@ illustrates how to construct the `PersistenceId` from the `entityTypeKey` and `e
The command handler is a function with 2 parameters, the current `State` and the incoming `Command`.
A command handler returns an `Effect` directive that defines what event or events, if any, to persist.
Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory]
and can be one of:
Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory].
The two most commonly used effects are:
* `persist` will persist one single event or several events atomically, i.e. all events
are stored or none of them are stored if there is an error
* `none` no events are to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects).
In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also
chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun`
function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function
is a convenience around creating a `SideEffect`.
In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying
the event is passed as parameter to the `thenRun` function. All `thenRun` registered callbacks
are executed sequentially after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`).
chain side effects that are to be performed after successful persist which is achieved with the `thenRun`
function e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`].
### Event handler
@ -206,6 +202,82 @@ Scala
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #behavior }
## Effects and Side Effects
A command handler returns an `Effect` directive that defines what event or events, if any, to persist.
Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory]
and can be one of:
* `persist` will persist one single event or several events atomically, i.e. all events
are stored or none of them are stored if there is an error
* `none` no events are to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
* `stash` the current command is stashed
* `unstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`]
* `reply` send a reply message to the given `ActorRef`
Note that only one of those can be chosen per incoming command. It is not possible to both persist and say none/unhandled.
In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also
chain side effects that are to be performed after successful persist which is achieved with the `thenRun`
function e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`].
In the example below the state is sent to the `subscriber` ActorRef. Note that the new state after applying
the event is passed as parameter of the `thenRun` function.
All `thenRun` registered callbacks are executed sequentially after successful execution of the persist statement
(or immediately, in case of `none` and `unhandled`).
In addition to `thenRun` the following actions can also be performed after successful persist:
* `thenStop` the actor will be stopped
* `thenUnstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`]
* `thenReply` send a reply message to the given `ActorRef`
Example of effects:
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #effects }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #effects }
Most of the time this will be done with the `thenRun` method on the `Effect` above. You can factor out
common side effects into functions and reuse for several commands. For example:
Scala
: @@snip [PersistentActorCompileOnlyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects }
Java
: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }
### Side effects ordering and guarantees
Any side effects are executed on an at-most-once basis and will not be executed if the persist fails.
Side effects are not run when the actor is restarted or started again after being stopped.
You may inspect the state when receiving the `RecoveryCompleted` signal and execute side effects that
have not been acknowledged at that point. That may possibly result in executing side effects more than once.
The side effects are executed sequentially, it is not possible to execute side effects in parallel, unless they
call out to something that is running concurrently (for example sending a message to another actor).
It's possible to execute a side effects before persisting the event, but that can result in that the
side effect is performed but the event is not stored if the persist fails.
### Atomic writes
It is possible to store several events atomically by using the `persist` effect with a list of events.
That means that all events passed to that method are stored or none of them are stored if there is an error.
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
a single `persist` effect.
Some journals may not support atomic writes of several events and they will then reject the `persist` with
multiple events. This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` (typically with a
`UnsupportedOperationException`) and can be handled with a @ref[supervisor](fault-tolerance.md).
## Cluster Sharding and EventSourcedBehavior
In a use case where the number of persistent actors needed is higher than what would fit in the memory of one node or
@ -231,8 +303,6 @@ Scala
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #actor-context }
## Changing Behavior
After processing a message, actors are able to return the `Behavior` that is used
@ -304,45 +374,6 @@ illustrated in @ref:[event handlers in the state](persistence-style.md#event-han
There is also an example illustrating an @ref:[optional initial state](persistence-style.md#optional-initial-state).
## Effects and Side Effects
Each command has a single `Effect` which can be:
* Persist events
* None: Accept the command but no effects
* Unhandled: Don't handle this command
* Stash: the current command is placed in a buffer and can be unstashed and processed later
Note that there is only one of these. It is not possible to both persist and say none/unhandled.
These are created using @java[a factory that is returned via the `Effect()` method]
@scala[the `Effect` factory] and once created additional side effects can be added.
Most of the time this will be done with the `thenRun` method on the `Effect` above. You can factor out
common side effects into functions and reuse for several commands. For example:
Scala
: @@snip [PersistentActorCompileOnlyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects }
Java
: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }
### Side effects ordering and guarantees
Any side effects are executed on an at-once basis and will not be executed if the persist fails.
The side effects are executed sequentially, it is not possible to execute side effects in parallel.
### Atomic writes
It is possible to store several events atomically by using the `persistAll` effect. That means that all events
passed to that method are stored or none of them are stored if there is an error.
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
`persistAll`.
Some journals may not support atomic writes of several events and they will then reject the `persistAll`
command. This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` (typically with a
`UnsupportedOperationException`) and can be handled with a @ref[supervisor](fault-tolerance.md).
## Replies
The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for

View file

@ -11,7 +11,7 @@ import akka.annotation.InternalApi
* A [[SideEffect]] is an side effect that can be chained after a main effect.
*
* Persist, none and unhandled are main effects. Then any number of
* call backs can be added to these effects with `andThen`.
* call backs can be added to these effects.
*
* INTERNAL API
*/

View file

@ -68,7 +68,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
* in order to keep updating the state state.
*
* For that reason it is strongly discouraged to perform side-effects in this handler;
* Side effects should be executed in `andThen` or `recoveryCompleted` blocks.
* Side effects should be executed in `thenRun` or `recoveryCompleted` blocks.
*/
protected def eventHandler(): EventHandler[State, Event]

View file

@ -15,6 +15,7 @@ import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.SnapshotFailed;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
// #behavior
import akka.persistence.typed.javadsl.EventSourcedBehavior;
@ -176,6 +177,112 @@ public class BasicPersistentBehaviorTest {
}
interface Effects {
public class MyPersistentBehavior
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
interface Command {}
public static class Add implements Command {
public final String data;
public Add(String data) {
this.data = data;
}
}
public enum Clear implements Command {
INSTANCE
}
interface Event {}
public static class Added implements Event {
public final String data;
public Added(String data) {
this.data = data;
}
}
public enum Cleared implements Event {
INSTANCE
}
public static class State {
private final List<String> items;
private State(List<String> items) {
this.items = items;
}
public State() {
this.items = new ArrayList<>();
}
public State addItem(String data) {
List<String> newItems = new ArrayList<>(items);
newItems.add(0, data);
// keep 5 items
List<String> latest = newItems.subList(0, Math.min(4, newItems.size() - 1));
return new State(latest);
}
}
public static Behavior<Command> create(
PersistenceId persistenceId, ActorRef<State> subscriber) {
return new MyPersistentBehavior(persistenceId, subscriber);
}
private MyPersistentBehavior(PersistenceId persistenceId, ActorRef<State> subscriber) {
super(persistenceId);
this.subscriber = subscriber;
}
@Override
public State emptyState() {
return new State();
}
// #effects
private final ActorRef<State> subscriber;
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(Add.class, this::onAdd)
.onCommand(Clear.class, this::onClear)
.build();
}
private Effect<Event, State> onAdd(Add command) {
return Effect()
.persist(new Added(command.data))
.thenRun(newState -> subscriber.tell(newState));
}
private Effect<Event, State> onClear(Clear command) {
return Effect()
.persist(Cleared.INSTANCE)
.thenRun(newState -> subscriber.tell(newState))
.thenStop();
}
// #effects
@Override
public EventHandler<State, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(Added.class, (state, event) -> state.addItem(event.data))
.onEvent(Cleared.class, () -> new State())
.build();
}
}
}
interface More {
// #supervision

View file

@ -6,6 +6,7 @@ package docs.akka.persistence.typed
import scala.concurrent.duration._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
@ -57,6 +58,17 @@ object BasicPersistentBehaviorCompileOnly {
}
//#command-handler
//#effects
def onCommand(subscriber: ActorRef[State], state: State, command: Command): Effect[Event, State] = {
command match {
case Add(data) =>
Effect.persist(Added(data)).thenRun(newState => subscriber ! newState)
case Clear =>
Effect.persist(Cleared).thenRun((newState: State) => subscriber ! newState).thenStop()
}
}
//#effects
//#event-handler
val eventHandler: (State, Event) => State = { (state, event) =>
event match {