diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index ff168fd922..41652d6c97 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -127,23 +127,19 @@ illustrates how to construct the `PersistenceId` from the `entityTypeKey` and `e The command handler is a function with 2 parameters, the current `State` and the incoming `Command`. A command handler returns an `Effect` directive that defines what event or events, if any, to persist. -Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory] -and can be one of: +Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory]. + +The two most commonly used effects are: * `persist` will persist one single event or several events atomically, i.e. all events are stored or none of them are stored if there is an error * `none` no events are to be persisted, for example a read-only command -* `unhandled` the command is unhandled (not supported) in current state -* `stop` stop this actor + +More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects). In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also -chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun` -function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function -is a convenience around creating a `SideEffect`. - -In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying -the event is passed as parameter to the `thenRun` function. All `thenRun` registered callbacks -are executed sequentially after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`). +chain side effects that are to be performed after successful persist which is achieved with the `thenRun` +function e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`]. ### Event handler @@ -206,6 +202,82 @@ Scala Java : @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #behavior } +## Effects and Side Effects + +A command handler returns an `Effect` directive that defines what event or events, if any, to persist. +Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory] +and can be one of: + +* `persist` will persist one single event or several events atomically, i.e. all events + are stored or none of them are stored if there is an error +* `none` no events are to be persisted, for example a read-only command +* `unhandled` the command is unhandled (not supported) in current state +* `stop` stop this actor +* `stash` the current command is stashed +* `unstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`] +* `reply` send a reply message to the given `ActorRef` + +Note that only one of those can be chosen per incoming command. It is not possible to both persist and say none/unhandled. + +In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also +chain side effects that are to be performed after successful persist which is achieved with the `thenRun` +function e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`]. + +In the example below the state is sent to the `subscriber` ActorRef. Note that the new state after applying +the event is passed as parameter of the `thenRun` function. + +All `thenRun` registered callbacks are executed sequentially after successful execution of the persist statement +(or immediately, in case of `none` and `unhandled`). + +In addition to `thenRun` the following actions can also be performed after successful persist: + +* `thenStop` the actor will be stopped +* `thenUnstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`] +* `thenReply` send a reply message to the given `ActorRef` + +Example of effects: + +Scala +: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #effects } + +Java +: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #effects } + +Most of the time this will be done with the `thenRun` method on the `Effect` above. You can factor out +common side effects into functions and reuse for several commands. For example: + +Scala +: @@snip [PersistentActorCompileOnlyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects } + +Java +: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects } + +### Side effects ordering and guarantees + +Any side effects are executed on an at-most-once basis and will not be executed if the persist fails. + +Side effects are not run when the actor is restarted or started again after being stopped. +You may inspect the state when receiving the `RecoveryCompleted` signal and execute side effects that +have not been acknowledged at that point. That may possibly result in executing side effects more than once. + +The side effects are executed sequentially, it is not possible to execute side effects in parallel, unless they +call out to something that is running concurrently (for example sending a message to another actor). + +It's possible to execute a side effects before persisting the event, but that can result in that the +side effect is performed but the event is not stored if the persist fails. + +### Atomic writes + +It is possible to store several events atomically by using the `persist` effect with a list of events. +That means that all events passed to that method are stored or none of them are stored if there is an error. + +The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by +a single `persist` effect. + +Some journals may not support atomic writes of several events and they will then reject the `persist` with +multiple events. This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` (typically with a +`UnsupportedOperationException`) and can be handled with a @ref[supervisor](fault-tolerance.md). + ## Cluster Sharding and EventSourcedBehavior In a use case where the number of persistent actors needed is higher than what would fit in the memory of one node or @@ -231,8 +303,6 @@ Scala Java : @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #actor-context } - - ## Changing Behavior After processing a message, actors are able to return the `Behavior` that is used @@ -304,45 +374,6 @@ illustrated in @ref:[event handlers in the state](persistence-style.md#event-han There is also an example illustrating an @ref:[optional initial state](persistence-style.md#optional-initial-state). -## Effects and Side Effects - -Each command has a single `Effect` which can be: - -* Persist events -* None: Accept the command but no effects -* Unhandled: Don't handle this command -* Stash: the current command is placed in a buffer and can be unstashed and processed later - -Note that there is only one of these. It is not possible to both persist and say none/unhandled. -These are created using @java[a factory that is returned via the `Effect()` method] -@scala[the `Effect` factory] and once created additional side effects can be added. - -Most of the time this will be done with the `thenRun` method on the `Effect` above. You can factor out -common side effects into functions and reuse for several commands. For example: - -Scala -: @@snip [PersistentActorCompileOnlyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects } - -Java -: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects } - -### Side effects ordering and guarantees - -Any side effects are executed on an at-once basis and will not be executed if the persist fails. -The side effects are executed sequentially, it is not possible to execute side effects in parallel. - -### Atomic writes - -It is possible to store several events atomically by using the `persistAll` effect. That means that all events -passed to that method are stored or none of them are stored if there is an error. - -The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by -`persistAll`. - -Some journals may not support atomic writes of several events and they will then reject the `persistAll` -command. This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` (typically with a -`UnsupportedOperationException`) and can be handled with a @ref[supervisor](fault-tolerance.md). - ## Replies The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/SideEffect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/SideEffect.scala index 6e1e83499f..ee47f5ee67 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/SideEffect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/SideEffect.scala @@ -11,7 +11,7 @@ import akka.annotation.InternalApi * A [[SideEffect]] is an side effect that can be chained after a main effect. * * Persist, none and unhandled are main effects. Then any number of - * call backs can be added to these effects with `andThen`. + * call backs can be added to these effects. * * INTERNAL API */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 0ce8dc376e..4fac5c7681 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -68,7 +68,7 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( * in order to keep updating the state state. * * For that reason it is strongly discouraged to perform side-effects in this handler; - * Side effects should be executed in `andThen` or `recoveryCompleted` blocks. + * Side effects should be executed in `thenRun` or `recoveryCompleted` blocks. */ protected def eventHandler(): EventHandler[State, Event] diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 96e2ddcd0b..d56c8a4280 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -15,6 +15,7 @@ import akka.persistence.typed.RecoveryCompleted; import akka.persistence.typed.SnapshotFailed; import akka.persistence.typed.SnapshotSelectionCriteria; import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.EventHandler; // #behavior import akka.persistence.typed.javadsl.EventSourcedBehavior; @@ -176,6 +177,112 @@ public class BasicPersistentBehaviorTest { } + interface Effects { + public class MyPersistentBehavior + extends EventSourcedBehavior< + MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> { + + interface Command {} + + public static class Add implements Command { + public final String data; + + public Add(String data) { + this.data = data; + } + } + + public enum Clear implements Command { + INSTANCE + } + + interface Event {} + + public static class Added implements Event { + public final String data; + + public Added(String data) { + this.data = data; + } + } + + public enum Cleared implements Event { + INSTANCE + } + + public static class State { + private final List items; + + private State(List items) { + this.items = items; + } + + public State() { + this.items = new ArrayList<>(); + } + + public State addItem(String data) { + List newItems = new ArrayList<>(items); + newItems.add(0, data); + // keep 5 items + List latest = newItems.subList(0, Math.min(4, newItems.size() - 1)); + return new State(latest); + } + } + + public static Behavior create( + PersistenceId persistenceId, ActorRef subscriber) { + return new MyPersistentBehavior(persistenceId, subscriber); + } + + private MyPersistentBehavior(PersistenceId persistenceId, ActorRef subscriber) { + super(persistenceId); + this.subscriber = subscriber; + } + + @Override + public State emptyState() { + return new State(); + } + + // #effects + private final ActorRef subscriber; + + @Override + public CommandHandler commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand(Add.class, this::onAdd) + .onCommand(Clear.class, this::onClear) + .build(); + } + + private Effect onAdd(Add command) { + return Effect() + .persist(new Added(command.data)) + .thenRun(newState -> subscriber.tell(newState)); + } + + private Effect onClear(Clear command) { + return Effect() + .persist(Cleared.INSTANCE) + .thenRun(newState -> subscriber.tell(newState)) + .thenStop(); + } + + // #effects + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent(Added.class, (state, event) -> state.addItem(event.data)) + .onEvent(Cleared.class, () -> new State()) + .build(); + } + } + } + interface More { // #supervision diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index d098b0f3ba..5c7d6882d7 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -6,6 +6,7 @@ package docs.akka.persistence.typed import scala.concurrent.duration._ +import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors @@ -57,6 +58,17 @@ object BasicPersistentBehaviorCompileOnly { } //#command-handler + //#effects + def onCommand(subscriber: ActorRef[State], state: State, command: Command): Effect[Event, State] = { + command match { + case Add(data) => + Effect.persist(Added(data)).thenRun(newState => subscriber ! newState) + case Clear => + Effect.persist(Cleared).thenRun((newState: State) => subscriber ! newState).thenStop() + } + } + //#effects + //#event-handler val eventHandler: (State, Event) => State = { (state, event) => event match {